diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 515de08ef173..2b7a6f7ac57e 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -997,15 +997,18 @@ ss::future> controller_backend::reconcile_ntp_step( = (updated_shard_on_cur_node == ss::this_shard_id()); } + const bool is_disabled = _topics.local().is_disabled(ntp); + vlog( clusterlog.trace, "[{}] orig_shard_on_cur_node: {}, updated_shard_on_cur_node: {}, " - "expected_log_rev: {}, expected_on_cur_shard: {}", + "expected_log_rev: {}, expected_on_cur_shard: {}, is_disabled: {}", ntp, orig_shard_on_cur_node, updated_shard_on_cur_node, expected_log_rev, - expected_on_cur_shard); + expected_on_cur_shard, + is_disabled); auto partition = _partition_manager.local().get(ntp); auto claim_it = _ntp_claims.find(ntp); @@ -1084,9 +1087,10 @@ ss::future> controller_backend::reconcile_ntp_step( // After this point the partition is expected to exist on this node. - if (!expected_on_cur_shard) { - // Partition is expected to exist on some other shard. If we still own - // it, shut it down and make it available for cross-shard move. + if (!expected_on_cur_shard || is_disabled) { + // Partition is disabled or expected to exist on some other shard. If we + // still own it, shut it down and make it available for a possible + // cross-shard move. if (partition) { rs.set_cur_operation( diff --git a/src/v/cluster/controller_snapshot.cc b/src/v/cluster/controller_snapshot.cc index 54342e2f1da2..6803406ad84f 100644 --- a/src/v/cluster/controller_snapshot.cc +++ b/src/v/cluster/controller_snapshot.cc @@ -106,6 +106,7 @@ ss::future<> topics_t::topic_t::serde_async_write(iobuf& out) { serde::write(out, metadata); co_await write_map_async(out, std::move(partitions)); co_await write_map_async(out, std::move(updates)); + serde::write(out, disabled_set); } ss::future<> @@ -115,6 +116,10 @@ topics_t::topic_t::serde_async_read(iobuf_parser& in, serde::header const h) { in, h._bytes_left_limit); updates = co_await read_map_async_nested( in, h._bytes_left_limit); + if (h._version >= 1) { + disabled_set = serde::read_nested( + in, h._bytes_left_limit); + } if (in.bytes_left() > h._bytes_left_limit) { in.skip(in.bytes_left() - h._bytes_left_limit); @@ -125,7 +130,6 @@ ss::future<> topics_t::serde_async_write(iobuf& out) { co_await write_map_async(out, std::move(topics)); serde::write(out, highest_group_id); co_await write_map_async(out, std::move(lifecycle_markers)); - co_await write_map_async(out, std::move(disabled_partitions)); } ss::future<> @@ -137,11 +141,6 @@ topics_t::serde_async_read(iobuf_parser& in, serde::header const h) { lifecycle_markers = co_await read_map_async_nested( in, h._bytes_left_limit); - if (h._version >= 1) { - disabled_partitions - = co_await read_map_async_nested( - in, h._bytes_left_limit); - } if (in.bytes_left() > h._bytes_left_limit) { in.skip(in.bytes_left() - h._bytes_left_limit); diff --git a/src/v/cluster/controller_snapshot.h b/src/v/cluster/controller_snapshot.h index 77ecf564eecf..90bdcb2cd980 100644 --- a/src/v/cluster/controller_snapshot.h +++ b/src/v/cluster/controller_snapshot.h @@ -114,7 +114,7 @@ struct config_t struct topics_t : public serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { // NOTE: layout here is a bit different than in the topic table because it // allows more compact storage and more convenient generation of controller // backend deltas when applying the snapshot. @@ -164,10 +164,11 @@ struct topics_t struct topic_t : public serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { topic_metadata_fields metadata; absl::node_hash_map partitions; absl::node_hash_map updates; + std::optional disabled_set; friend bool operator==(const topic_t&, const topic_t&) = default; @@ -185,13 +186,6 @@ struct topics_t nt_revision_eq> lifecycle_markers; - absl::node_hash_map< - model::topic_namespace, - topic_disabled_partitions_set, - model::topic_namespace_hash, - model::topic_namespace_eq> - disabled_partitions; - friend bool operator==(const topics_t&, const topics_t&) = default; ss::future<> serde_async_write(iobuf&); diff --git a/src/v/cluster/errc.h b/src/v/cluster/errc.h index 66af5d322a63..a692dd980b5b 100644 --- a/src/v/cluster/errc.h +++ b/src/v/cluster/errc.h @@ -74,6 +74,9 @@ enum class errc : int16_t { transform_invalid_source, transform_invalid_environment, trackable_keys_limit_exceeded, + topic_disabled, + partition_disabled, + invalid_partition_operation, }; struct errc_category final : public std::error_category { const char* name() const noexcept final { return "cluster::errc"; } @@ -214,6 +217,12 @@ struct errc_category final : public std::error_category { return "Invalid transform environment"; case errc::trackable_keys_limit_exceeded: return "Too many keys are currently tracked, no space for more."; + case errc::topic_disabled: + return "Topic disabled by user"; + case errc::partition_disabled: + return "Partition disabled by user"; + case errc::invalid_partition_operation: + return "Invalid partition operation"; } return "cluster::errc::unknown"; } diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 33ef12e7f008..304ddf5582e1 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -362,4 +362,14 @@ bool metadata_cache::is_update_in_progress(const model::ntp& ntp) const { return _topics_state.local().is_update_in_progress(ntp); } +bool metadata_cache::is_disabled( + model::topic_namespace_view ns_tp, model::partition_id p_id) const { + return _topics_state.local().is_disabled(ns_tp, p_id); +} + +const topic_disabled_partitions_set* metadata_cache::get_topic_disabled_set( + model::topic_namespace_view ns_tp) const { + return _topics_state.local().get_topic_disabled_set(ns_tp); +} + } // namespace cluster diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index 45dcdab6f7ca..93d703236c60 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -200,6 +200,10 @@ class metadata_cache { const topic_table::updates_t& updates_in_progress() const; bool is_update_in_progress(const model::ntp& ntp) const; + bool is_disabled(model::topic_namespace_view, model::partition_id) const; + const topic_disabled_partitions_set* + get_topic_disabled_set(model::topic_namespace_view) const; + private: ss::sharded& _topics_state; ss::sharded& _members_table; diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index 55739ccefcfd..c142cde3b5b2 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -668,6 +668,8 @@ class partition_balancer_planner::immutable_partition { reconfiguration_state, // can't add more actions batch_full, + // disabled by user + disabled, }; immutability_reason reason() const { return _reason; } @@ -691,6 +693,9 @@ class partition_balancer_planner::immutable_partition { reason = ssx::sformat( "reconfiguration in progress, state: {}", _reconfiguration_state); break; + case immutability_reason::disabled: + reason = "partition disabled by user"; + break; } const bool can_log = _ctx.increment_failure_count(); @@ -767,6 +772,8 @@ auto partition_balancer_planner::request_context::do_with_partition( const model::ntp& ntp, const std::vector& orig_replicas, Visitor& visitor) { + const bool is_disabled = _parent._state.topics().is_disabled(ntp); + auto in_progress_it = _parent._state.topics().updates_in_progress().find( ntp); if (in_progress_it != _parent._state.topics().updates_in_progress().end()) { @@ -775,6 +782,16 @@ auto partition_balancer_planner::request_context::do_with_partition( = in_progress_it->second.get_previous_replicas(); auto state = in_progress_it->second.get_state(); + if (is_disabled) { + partition part{immutable_partition{ + ntp, + replicas, + immutable_partition::immutability_reason::disabled, + state, + *this}}; + return visitor(part); + } + if (state == reconfiguration_state::in_progress) { if (can_add_cancellation()) { partition part{ @@ -800,6 +817,16 @@ auto partition_balancer_planner::request_context::do_with_partition( } } + if (is_disabled) { + partition part{immutable_partition{ + ntp, + orig_replicas, + immutable_partition::immutability_reason::disabled, + std::nullopt, + *this}}; + return visitor(part); + } + auto reassignment_it = _reassignments.find(ntp); if (reassignment_it == _reassignments.end() && !can_add_reassignment()) { diff --git a/src/v/cluster/plugin_frontend.cc b/src/v/cluster/plugin_frontend.cc index 9105c35e81c0..3dab6ac192c3 100644 --- a/src/v/cluster/plugin_frontend.cc +++ b/src/v/cluster/plugin_frontend.cc @@ -457,6 +457,14 @@ errc plugin_frontend::validator::validate_mutation(const transform_cmd& cmd) { loggable_string(cmd.value.input_topic.tp())); return errc::transform_invalid_create; } + if (_topics->is_fully_disabled(cmd.value.input_topic)) { + vlog( + clusterlog.info, + "attempted to deploy transform {} to a disabled topic {}", + cmd.value.name, + loggable_string(cmd.value.input_topic.tp())); + return errc::transform_invalid_create; + } if (cmd.value.output_topics.empty()) { vlog( clusterlog.info, @@ -534,6 +542,15 @@ errc plugin_frontend::validator::validate_mutation(const transform_cmd& cmd) { out_name.tp); return errc::transform_invalid_create; } + if (_topics->is_fully_disabled(out_name)) { + vlog( + clusterlog.info, + "attempted to deploy transform {} to write to a disabled " + "topic {}", + cmd.value.name, + loggable_string(out_name.tp())); + return errc::transform_invalid_create; + } if (would_cause_cycle(cmd.value.input_topic, out_name)) { vlog( clusterlog.info, diff --git a/src/v/cluster/rm_partition_frontend.cc b/src/v/cluster/rm_partition_frontend.cc index c1c574b87e17..5d43424fe089 100644 --- a/src/v/cluster/rm_partition_frontend.cc +++ b/src/v/cluster/rm_partition_frontend.cc @@ -74,6 +74,11 @@ ss::future rm_partition_frontend::begin_tx( co_return begin_tx_reply{ntp, tx_errc::partition_not_exists}; } + if (_metadata_cache.local().is_disabled(nt, ntp.tp.partition)) { + vlog(txlog.warn, "partition {} is disabled by user", ntp); + co_return begin_tx_reply{ntp, tx_errc::partition_disabled}; + } + auto leader_opt = _leaders.local().get_leader(ntp); if (!leader_opt) { vlog(txlog.warn, "{} is leaderless", ntp); @@ -264,6 +269,11 @@ ss::future rm_partition_frontend::commit_tx( commit_tx_reply{tx_errc::partition_not_exists}); } + if (_metadata_cache.local().is_disabled(nt, ntp.tp.partition)) { + return ss::make_ready_future( + commit_tx_reply{tx_errc::partition_disabled}); + } + auto leader = _leaders.local().get_leader(ntp); if (!leader) { vlog(txlog.warn, "can't find a leader for {} pid:{}", ntp, pid); @@ -402,6 +412,11 @@ ss::future rm_partition_frontend::abort_tx( abort_tx_reply{tx_errc::partition_not_exists}); } + if (_metadata_cache.local().is_disabled(nt, ntp.tp.partition)) { + return ss::make_ready_future( + abort_tx_reply{tx_errc::partition_disabled}); + } + auto leader = _leaders.local().get_leader(ntp); if (!leader) { vlog(txlog.warn, "can't find a leader for {}", ntp); diff --git a/src/v/cluster/scheduling/leader_balancer.cc b/src/v/cluster/scheduling/leader_balancer.cc index 6f02a8376563..0b71b9988855 100644 --- a/src/v/cluster/scheduling/leader_balancer.cc +++ b/src/v/cluster/scheduling/leader_balancer.cc @@ -636,6 +636,7 @@ leader_balancer::index_type leader_balancer::build_index() { // for each ntp in the cluster for (const auto& topic : _topics.topics_map()) { + const auto* disabled_set = _topics.get_topic_disabled_set(topic.first); for (const auto& partition : topic.second.get_assignments()) { if (partition.replicas.empty()) { vlog( @@ -656,6 +657,12 @@ leader_balancer::index_type leader_balancer::build_index() { continue; } + if (disabled_set && disabled_set->is_disabled(partition.id)) { + // skip balancing disabled partitions, as they shouldn't have + // leaders anyway + continue; + } + /* * if the partition group is a part of our in flight changes * then assume that leadership will be transferred to the target diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index add70e008836..0653777494c5 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -204,6 +204,10 @@ topic_table::apply(create_partition_cmd cmd, model::offset offset) { co_return errc::topic_not_exists; } + if (is_fully_disabled(cmd.key)) { + co_return errc::topic_disabled; + } + // add partitions auto prev_partition_count = tp->second.get_configuration().partition_count; // update partitions count @@ -266,6 +270,10 @@ ss::future topic_table::do_apply( errc::partition_not_exists); } + if (is_disabled(cmd_data.ntp)) { + return ss::make_ready_future(errc::partition_disabled); + } + if (_updates_in_progress.contains(cmd_data.ntp)) { return ss::make_ready_future(errc::update_in_progress); } @@ -386,6 +394,10 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) { co_return errc::partition_not_exists; } + if (is_disabled(cmd.key)) { + co_return errc::partition_disabled; + } + // update must be in progress to be able to cancel it auto in_progress_it = _updates_in_progress.find(cmd.key); if (in_progress_it == _updates_in_progress.end()) { @@ -503,34 +515,45 @@ topic_table::apply(move_topic_replicas_cmd cmd, model::offset o) { // We should check partition before create updates - if (std::any_of( - cmd.value.begin(), - cmd.value.end(), - [&tp](const auto& partition_and_replicas) { - return !tp->second.get_assignments().contains( - partition_and_replicas.partition); - })) { + const auto* disabled_set = get_topic_disabled_set(cmd.key); + if (disabled_set && disabled_set->is_fully_disabled()) { vlog( clusterlog.warn, - "topic: {}: Can not move replicas, becasue can not find " - "partitions", + "topic {}: Can not move replicas, topic disabled", cmd.key); - co_return errc::partition_not_exists; + co_return errc::topic_disabled; } - if (std::any_of( - cmd.value.begin(), - cmd.value.end(), - [this, key = cmd.key](const auto& partition_and_replicas) { - return _updates_in_progress.contains( - model::ntp(key.ns, key.tp, partition_and_replicas.partition)); - })) { - vlog( - clusterlog.warn, - "topic: {}: Can not move replicas for topic, some updates in " - "progress", - cmd.key); - co_return errc::update_in_progress; + for (const auto& partition_and_replicas : cmd.value) { + auto partition = partition_and_replicas.partition; + if (!tp->second.get_assignments().contains(partition)) { + vlog( + clusterlog.warn, + "topic {}: Can not move replicas, partition {} not found", + cmd.key, + partition); + co_return errc::partition_not_exists; + } + + if (disabled_set && disabled_set->is_disabled(partition)) { + vlog( + clusterlog.warn, + "topic {}: Can not move replicas, partition {} disabled", + cmd.key, + partition); + co_return errc::partition_disabled; + } + + if (_updates_in_progress.contains( + model::ntp{cmd.key.ns, cmd.key.tp, partition})) { + vlog( + clusterlog.warn, + "topic {}: Can not move replicas, update for partition {} is in " + "progress", + cmd.key, + partition); + co_return errc::update_in_progress; + } } for (const auto& [partition_id, new_replicas] : cmd.value) { @@ -567,6 +590,10 @@ topic_table::apply(force_partition_reconfiguration_cmd cmd, model::offset o) { errc::partition_not_exists); } + if (is_disabled(cmd.key)) { + return ss::make_ready_future(errc::partition_disabled); + } + if (auto it = _updates_in_progress.find(cmd.key); it != _updates_in_progress.end()) { return ss::make_ready_future(errc::update_in_progress); @@ -596,9 +623,9 @@ topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) { if (topic_it == _topics.end()) { co_return errc::topic_not_exists; } + const auto& assignments = topic_it->second.get_assignments(); if (cmd.value.partition_id) { - const auto& assignments = topic_it->second.get_assignments(); if (!assignments.contains(*cmd.value.partition_id)) { co_return errc::partition_not_exists; } @@ -616,14 +643,41 @@ topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) { if (disabled_set.is_fully_enabled()) { _disabled_partitions.erase(disabled_it); } + + _pending_deltas.emplace_back( + model::ntp{ + cmd.value.ns_tp.ns, cmd.value.ns_tp.tp, *cmd.value.partition_id}, + model::revision_id{o}, + topic_table_delta_type::disabled_flag_updated); } else { + topic_disabled_partitions_set old_disabled_set; if (cmd.value.disabled) { - _disabled_partitions[cmd.value.ns_tp].set_fully_disabled(); + auto& disabled_set = _disabled_partitions[cmd.value.ns_tp]; + old_disabled_set = std::exchange( + disabled_set, topic_disabled_partitions_set{}); + disabled_set.set_fully_disabled(); } else { - _disabled_partitions.erase(cmd.value.ns_tp); + auto it = _disabled_partitions.find(cmd.value.ns_tp); + if (it != _disabled_partitions.end()) { + old_disabled_set = std::move(it->second); + _disabled_partitions.erase(it); + } + } + + for (const auto& p_as : assignments) { + if (old_disabled_set.is_disabled(p_as.id) == cmd.value.disabled) { + continue; + } + + _pending_deltas.emplace_back( + model::ntp{cmd.value.ns_tp.ns, cmd.value.ns_tp.tp, p_as.id}, + model::revision_id{o}, + topic_table_delta_type::disabled_flag_updated); } } + notify_waiters(); + co_return errc::success; } @@ -852,29 +906,32 @@ topic_table::fill_snapshot(controller_snapshot& controller_snap) const { co_await ss::coroutine::maybe_yield(); } + std::optional disabled_set; + if (auto it = _disabled_partitions.find(ns_tp); + it != _disabled_partitions.end()) { + disabled_set = it->second; + } + snap.topics.emplace( ns_tp, controller_snapshot_parts::topics_t::topic_t{ .metadata = md_item.metadata.get_fields(), .partitions = std::move(partitions), .updates = std::move(updates), + .disabled_set = std::move(disabled_set), }); } for (const auto& [ntr, lm] : _lifecycle_markers) { snap.lifecycle_markers.emplace(ntr, lm); } - - for (const auto& [ns_tp, dps] : _disabled_partitions) { - snap.disabled_partitions.emplace(ns_tp, dps); - co_await ss::coroutine::maybe_yield(); - } } // helper class to hold context needed for adding/deleting ntps when applying a // controller snapshot class topic_table::snapshot_applier { updates_t& _updates_in_progress; + disabled_partitions_t& _disabled_partitions; fragmented_vector& _pending_deltas; topic_table_probe& _probe; model::revision_id _snap_revision; @@ -882,6 +939,7 @@ class topic_table::snapshot_applier { public: snapshot_applier(topic_table& parent, model::revision_id snap_revision) : _updates_in_progress(parent._updates_in_progress) + , _disabled_partitions(parent._disabled_partitions) , _pending_deltas(parent._pending_deltas) , _probe(parent._probe) , _snap_revision(snap_revision) {} @@ -910,6 +968,7 @@ class topic_table::snapshot_applier { delete_ntp(ns_tp, p_as); co_await ss::coroutine::maybe_yield(); } + _disabled_partitions.erase(ns_tp); _probe.handle_topic_deletion(ns_tp); // topic_metadata_item object is supposed to be removed from _topics by // the caller @@ -1054,6 +1113,9 @@ class topic_table::snapshot_applier { const model::topic_namespace& ns_tp, const controller_snapshot_parts::topics_t::topic_t& topic) { topic_metadata_item ret{topic_metadata{topic.metadata, {}}}; + if (topic.disabled_set) { + _disabled_partitions[ns_tp] = *topic.disabled_set; + } for (const auto& [p_id, partition] : topic.partitions) { auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_id); add_ntp(ntp, topic, partition, ret, false); @@ -1104,6 +1166,17 @@ ss::future<> topic_table::apply_snapshot( md_item.metadata.get_fields() = topic_snapshot.metadata; + topic_disabled_partitions_set old_disabled_set; + if (topic_snapshot.disabled_set) { + old_disabled_set = std::exchange( + _disabled_partitions[ns_tp], + *topic_snapshot.disabled_set); + } else if (auto it = _disabled_partitions.find(ns_tp); + it != _disabled_partitions.end()) { + old_disabled_set = std::move(it->second); + _disabled_partitions.erase(it); + } + // 2. For each partition in the new set, reconcile assignments // and add corresponding deltas for (const auto& [p_id, partition] : @@ -1115,6 +1188,17 @@ ss::future<> topic_table::apply_snapshot( partition, md_item, must_update_properties); + + const bool new_is_disabled + = topic_snapshot.disabled_set + && topic_snapshot.disabled_set->is_disabled(p_id); + if (old_disabled_set.is_disabled(p_id) != new_is_disabled) { + _pending_deltas.emplace_back( + ntp, + snap_revision, + topic_table_delta_type::disabled_flag_updated); + } + co_await ss::coroutine::maybe_yield(); } @@ -1155,9 +1239,6 @@ ss::future<> topic_table::apply_snapshot( // etc, so we can just copy directly into place. _lifecycle_markers = controller_snap.topics.lifecycle_markers; - // Same for disabled partitions. - _disabled_partitions = controller_snap.topics.disabled_partitions; - // 2. re-calculate derived state _partition_count = 0; diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index f4a9fbc3469d..08a86ac321e4 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -269,6 +269,10 @@ ss::future topics_frontend::do_update_replication_factor( co_return cluster::errc::feature_disabled; } + if (_topics.local().is_fully_disabled(update.tp_ns)) { + co_return errc::topic_disabled; + } + auto value = update.custom_properties.replication_factor.value; if ( !value.has_value() @@ -802,6 +806,9 @@ ss::future topics_frontend::move_partition_replicas( co_return result.error(); } + if (_topics.local().is_disabled(ntp)) { + co_return errc::partition_disabled; + } if (_topics.local().is_update_in_progress(ntp)) { co_return errc::update_in_progress; } @@ -845,6 +852,9 @@ ss::future topics_frontend::force_update_partition_replicas( if (!result) { co_return result.error(); } + if (_topics.local().is_disabled(ntp)) { + co_return errc::partition_disabled; + } force_partition_reconfiguration_cmd cmd{ std::move(ntp), force_partition_reconfiguration_cmd_data{std::move(new_replica_set)}}; @@ -861,6 +871,9 @@ ss::future topics_frontend::cancel_moving_partition_replicas( if (!result) { co_return result.error(); } + if (_topics.local().is_disabled(ntp)) { + co_return errc::partition_disabled; + } if (!_topics.local().is_update_in_progress(ntp)) { co_return errc::no_update_in_progress; } @@ -882,6 +895,9 @@ ss::future topics_frontend::abort_moving_partition_replicas( co_return result.error(); } + if (_topics.local().is_disabled(ntp)) { + co_return errc::partition_disabled; + } if (!_topics.local().is_update_in_progress(ntp)) { co_return errc::no_update_in_progress; } @@ -1021,6 +1037,10 @@ ss::future topics_frontend::set_topic_partitions_disabled( co_return errc::feature_disabled; } + if (!model::is_user_topic(ns_tp)) { + co_return errc::invalid_partition_operation; + } + auto r = co_await stm_linearizable_barrier(timeout); if (!r) { co_return r.error(); @@ -1099,6 +1119,9 @@ ss::future topics_frontend::do_create_partition( co_return make_error_result( p_cfg.tp_ns, errc::topic_invalid_partitions); } + if (_topics.local().is_fully_disabled(p_cfg.tp_ns)) { + co_return make_error_result(p_cfg.tp_ns, errc::topic_disabled); + } auto units = co_await _allocator.invoke_on( partition_allocator::shard, @@ -1247,6 +1270,10 @@ ss::future topics_frontend::change_replication_factor( co_return errc::success; } + if (_topics.local().is_fully_disabled(topic)) { + co_return errc::topic_disabled; + } + if (new_replication_factor < current_replication_factor) { co_return co_await decrease_replication_factor( topic, new_replication_factor, timeout); @@ -1360,6 +1387,10 @@ ss::future topics_frontend::increase_replication_factor( co_return errc::topic_not_exists; } + if (_topics.local().is_fully_disabled(topic)) { + co_return errc::topic_disabled; + } + auto partition_count = tp_metadata->get_configuration().partition_count; // units shold exist during replicate_and_wait call @@ -1465,6 +1496,10 @@ ss::future topics_frontend::decrease_replication_factor( co_return errc::topic_not_exists; } + if (_topics.local().is_fully_disabled(topic)) { + co_return errc::topic_disabled; + } + std::optional error; auto metadata_ref = tp_metadata.value().get(); @@ -1524,6 +1559,10 @@ topics_frontend::generate_reassignments( co_return errc::topic_not_exists; } + if (_topics.local().is_disabled(ntp)) { + co_return errc::partition_disabled; + } + auto tp_replication_factor = tp_metadata.value().get().get_replication_factor(); diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index a2b8cb37cd97..e392533eac17 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -1569,6 +1569,10 @@ ss::future tx_gateway_frontend::do_add_partition_to_tx( model::topic topic(req_topic.name); + const auto* disabled_set + = _metadata_cache.local().get_topic_disabled_set( + model::topic_namespace_view{model::kafka_namespace, topic}); + res_topic.results.reserve(req_topic.partitions.size()); for (model::partition_id req_partition : req_topic.partitions) { model::ntp ntp(model::kafka_namespace, topic, req_partition); @@ -1581,6 +1585,12 @@ ss::future tx_gateway_frontend::do_add_partition_to_tx( res_partition.partition_index = req_partition; res_partition.error_code = tx_errc::none; res_topic.results.push_back(res_partition); + } else if ( + disabled_set && disabled_set->is_disabled(req_partition)) { + add_paritions_tx_reply::partition_result res_partition; + res_partition.partition_index = req_partition; + res_partition.error_code = tx_errc::partition_disabled; + res_topic.results.push_back(res_partition); } else { new_partitions.push_back(ntp); } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 8ac09f7e39c0..34a286199259 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -149,6 +149,9 @@ std::ostream& operator<<(std::ostream& o, const tx_errc& err) { case tx_errc::tx_id_not_found: o << "tx_errc::tx_id_not_found"; break; + case tx_errc::partition_disabled: + o << "tx_errc::partition_disabled"; + break; } return o; } @@ -457,6 +460,8 @@ std::ostream& operator<<(std::ostream& o, const topic_table_delta_type& tp) { return o << "replicas_updated"; case topic_table_delta_type::properties_updated: return o << "properties_updated"; + case topic_table_delta_type::disabled_flag_updated: + return o << "disabled_flag_updated"; } __builtin_unreachable(); } diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 330d0bf419a2..e722e10b0ab2 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -200,7 +200,8 @@ enum class tx_errc { invalid_txn_state, invalid_producer_epoch, tx_not_found, - tx_id_not_found + tx_id_not_found, + partition_disabled, }; std::ostream& operator<<(std::ostream&, const tx_errc&); @@ -2425,6 +2426,7 @@ enum class topic_table_delta_type { removed, replicas_updated, properties_updated, + disabled_flag_updated, }; std::ostream& operator<<(std::ostream&, const topic_table_delta_type&); diff --git a/src/v/kafka/server/errors.h b/src/v/kafka/server/errors.h index 4b1808502600..b0f77ae194e9 100644 --- a/src/v/kafka/server/errors.h +++ b/src/v/kafka/server/errors.h @@ -49,6 +49,9 @@ constexpr error_code map_topic_error_code(cluster::errc code) { return error_code::throttling_quota_exceeded; case cluster::errc::no_update_in_progress: return error_code::no_reassignment_in_progress; + case cluster::errc::topic_disabled: + case cluster::errc::partition_disabled: + return error_code::policy_violation; case cluster::errc::replication_error: case cluster::errc::shutting_down: case cluster::errc::join_request_dispatch_error: @@ -91,6 +94,7 @@ constexpr error_code map_topic_error_code(cluster::errc code) { case cluster::errc::transform_invalid_update: case cluster::errc::transform_invalid_source: case cluster::errc::trackable_keys_limit_exceeded: + case cluster::errc::invalid_partition_operation: break; } return error_code::unknown_server_error; diff --git a/src/v/kafka/server/handlers/delete_records.cc b/src/v/kafka/server/handlers/delete_records.cc index 5af5391a5e8e..68733d0f915b 100644 --- a/src/v/kafka/server/handlers/delete_records.cc +++ b/src/v/kafka/server/handlers/delete_records.cc @@ -230,8 +230,21 @@ delete_records_handler::handle(request_context ctx, ss::smp_service_group) { .partitions = std::move(topic_level_errors)}); return; } + + const auto* disabled_set + = ctx.metadata_cache().get_topic_disabled_set( + model::topic_namespace_view{model::kafka_namespace, topic.name}); + for (auto& partition : topic.partitions) { auto ktp = model::ktp(topic.name, partition.partition_index); + if ( + disabled_set + && disabled_set->is_disabled(partition.partition_index)) { + fs.push_back( + ss::make_ready_future(make_partition_error( + ktp, error_code::replica_not_available))); + continue; + } auto shard = ctx.shards().shard_for(ktp); if (!shard) { fs.push_back( diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 61f63262319e..6aa078c9c792 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -766,6 +766,15 @@ class simple_fetch_planner final : public fetch_planner::impl { auto& tp = fp.topic_partition; + if (unlikely(octx.rctx.metadata_cache().is_disabled( + tp.as_tn_view(), tp.get_partition()))) { + resp_it->set(make_partition_response_error( + fp.topic_partition.get_partition(), + error_code::replica_not_available)); + ++resp_it; + return; + } + auto shard = octx.rctx.shards().shard_for(tp); if (unlikely(!shard)) { // there is given partition in topic metadata, return diff --git a/src/v/kafka/server/handlers/list_offsets.cc b/src/v/kafka/server/handlers/list_offsets.cc index 8e953520161f..9e880cdab0c3 100644 --- a/src/v/kafka/server/handlers/list_offsets.cc +++ b/src/v/kafka/server/handlers/list_offsets.cc @@ -181,6 +181,10 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) { std::vector> partitions; partitions.reserve(topic.partitions.size()); + const auto* disabled_set + = octx.rctx.metadata_cache().get_topic_disabled_set( + model::topic_namespace_view{model::kafka_namespace, topic.name}); + for (auto& part : topic.partitions) { if (octx.request.duplicate_tp(topic.name, part.partition_index)) { partitions.push_back( @@ -201,6 +205,14 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) { continue; } + if (disabled_set && disabled_set->is_disabled(part.partition_index)) { + partitions.push_back( + ss::make_ready_future( + list_offsets_response::make_partition( + part.partition_index, error_code::replica_not_available))); + continue; + } + auto pr = list_offsets_partition(octx, part.timestamp, topic, part); partitions.push_back(std::move(pr)); } diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index c8eb9e0c10f0..42dbc4d7aa8e 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -116,6 +116,7 @@ metadata_response::topic make_topic_response_from_topic_metadata( tp.is_internal = is_internal(tp_ns); const bool is_user_topic = model::is_user_topic(tp_ns); + const auto* disabled_set = md_cache.get_topic_disabled_set(tp_ns); for (const auto& p_as : tp_md.get_assignments()) { std::vector replicas{}; @@ -130,6 +131,8 @@ metadata_response::topic make_topic_response_from_topic_metadata( p.error_code = error_code::none; if (recovery_mode_enabled && is_user_topic) { p.error_code = error_code::policy_violation; + } else if (disabled_set && disabled_set->is_disabled(p_as.id)) { + p.error_code = error_code::replica_not_available; } p.partition_index = p_as.id; p.leader_id = no_leader; diff --git a/src/v/kafka/server/handlers/offset_for_leader_epoch.cc b/src/v/kafka/server/handlers/offset_for_leader_epoch.cc index 42837b5e9629..ea13468b7fd6 100644 --- a/src/v/kafka/server/handlers/offset_for_leader_epoch.cc +++ b/src/v/kafka/server/handlers/offset_for_leader_epoch.cc @@ -133,6 +133,10 @@ get_offsets_for_leader_epochs( offset_for_leader_topic_result{.topic = request_topic.topic}); result.back().partitions.reserve(request_topic.partitions.size()); + const auto* disabled_set = ctx.metadata_cache().get_topic_disabled_set( + model::topic_namespace_view{ + model::kafka_namespace, request_topic.topic}); + for (auto& request_partition : request_topic.partitions) { // add response placeholder result.back().partitions.push_back(epoch_end_offset{}); @@ -151,6 +155,15 @@ get_offsets_for_leader_epochs( continue; } + if ( + disabled_set + && disabled_set->is_disabled(request_partition.partition)) { + partition_response = response_t::make_epoch_end_offset( + request_partition.partition, + error_code::replica_not_available); + continue; + } + auto shard = ctx.shards().shard_for(ktp); // no shard found, we may be in the middle of partition move, return // not leader for partition error diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index b786c30edf49..1f43b920c09e 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -498,7 +498,20 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { partitions_produced.reserve(topic.partitions.size()); partitions_dispatched.reserve(topic.partitions.size()); + const auto* disabled_set + = octx.rctx.metadata_cache().get_topic_disabled_set( + model::topic_namespace_view{model::kafka_namespace, topic.name}); + for (auto& part : topic.partitions) { + auto push_error_response = [&](error_code errc) { + partitions_dispatched.push_back(ss::now()); + partitions_produced.push_back( + ss::make_ready_future( + produce_response::partition{ + .partition_index = part.partition_index, + .error_code = errc})); + }; + const auto& kafka_noproduce_topics = config::shard_local_cfg().kafka_noproduce_topics(); const bool is_noproduce_topic = std::find( @@ -518,56 +531,38 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { if ( (is_noproduce_topic || audit_produce_restricted) && !is_audit_produce) { - partitions_dispatched.push_back(ss::now()); - partitions_produced.push_back( - ss::make_ready_future( - produce_response::partition{ - .partition_index = part.partition_index, - .error_code = error_code::topic_authorization_failed})); + push_error_response(error_code::topic_authorization_failed); continue; } if (!octx.rctx.metadata_cache().contains( model::topic_namespace_view(model::kafka_namespace, topic.name), part.partition_index)) { - partitions_dispatched.push_back(ss::now()); - partitions_produced.push_back( - ss::make_ready_future( - produce_response::partition{ - .partition_index = part.partition_index, - .error_code = error_code::unknown_topic_or_partition})); + push_error_response(error_code::unknown_topic_or_partition); + continue; + } + + if (unlikely( + disabled_set + && disabled_set->is_disabled(part.partition_index))) { + push_error_response(error_code::replica_not_available); continue; } // the record data on the wire was null value if (unlikely(!part.records)) { - partitions_dispatched.push_back(ss::now()); - partitions_produced.push_back( - ss::make_ready_future( - produce_response::partition{ - .partition_index = part.partition_index, - .error_code = error_code::invalid_record})); + push_error_response(error_code::invalid_record); continue; } // an error occurred handling legacy messages (magic 0 or 1) if (unlikely(part.records->adapter.legacy_error)) { - partitions_dispatched.push_back(ss::now()); - partitions_produced.push_back( - ss::make_ready_future( - produce_response::partition{ - .partition_index = part.partition_index, - .error_code = error_code::invalid_record})); + push_error_response(error_code::invalid_record); continue; } if (unlikely(!part.records->adapter.valid_crc)) { - partitions_dispatched.push_back(ss::now()); - partitions_produced.push_back( - ss::make_ready_future( - produce_response::partition{ - .partition_index = part.partition_index, - .error_code = error_code::corrupt_message})); + push_error_response(error_code::corrupt_message); continue; } @@ -581,12 +576,7 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { if (unlikely( !part.records->adapter.v2_format || !part.records->adapter.batch)) { - partitions_dispatched.push_back(ss::now()); - partitions_produced.push_back( - ss::make_ready_future( - produce_response::partition{ - .partition_index = part.partition_index, - .error_code = error_code::invalid_record})); + push_error_response(error_code::invalid_record); continue; } diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index 1b222bc555b1..17a0fcb3dafc 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -1050,6 +1050,10 @@ ss::future add_partitions_to_txn_handler::handle( case cluster::tx_errc::timeout: partition.error_code = error_code::request_timed_out; break; + case cluster::tx_errc::partition_disabled: + partition.error_code + = error_code::replica_not_available; + break; default: partition.error_code = error_code::unknown_server_error; diff --git a/src/v/redpanda/admin/server.cc b/src/v/redpanda/admin/server.cc index 498d0b7653dd..9468ab59e5a5 100644 --- a/src/v/redpanda/admin/server.cc +++ b/src/v/redpanda/admin/server.cc @@ -1169,6 +1169,7 @@ ss::future<> admin_server::throw_on_error( case cluster::errc::transform_invalid_environment: case cluster::errc::source_topic_not_exists: case cluster::errc::source_topic_still_in_use: + case cluster::errc::invalid_partition_operation: throw ss::httpd::bad_request_exception( fmt::format("{}", ec.message())); default: diff --git a/src/v/transform/api.cc b/src/v/transform/api.cc index cafea31491ca..5e67cfe0259a 100644 --- a/src/v/transform/api.cc +++ b/src/v/transform/api.cc @@ -77,16 +77,31 @@ class rpc_client_sink final : public sink { private: model::partition_id compute_output_partition() { - const auto& config = _topic_table->get_topic_cfg({ - model::kafka_namespace, - _topic, - }); + model::topic_namespace_view ns_tp{model::kafka_namespace, _topic}; + const auto& config = _topic_table->get_topic_cfg(ns_tp); if (!config) { throw std::runtime_error(ss::format( "unable to compute output partition for topic: {}", _topic)); } - return model::partition_id( - _input_partition_id % config->partition_count); + + const auto* disabled_set = _topic_table->get_topic_disabled_set(ns_tp); + if (!(disabled_set && disabled_set->is_fully_disabled())) { + // Do linear probing to find a non-disabled partition. The + // expectation is that most of the times we'll need just a few + // probes. + for (int32_t i = 0; i < config->partition_count; ++i) { + model::partition_id candidate( + (_input_partition_id + i) % config->partition_count); + if (!(disabled_set && disabled_set->is_disabled(candidate))) { + return candidate; + } + } + } + + throw std::runtime_error(ss::format( + "unable to compute output partition for topic: {}, all output " + "partitions disabled", + _topic)); } model::topic _topic; diff --git a/tests/rptest/tests/recovery_mode_test.py b/tests/rptest/tests/recovery_mode_test.py index 187bee85bb70..8aba856c3808 100644 --- a/tests/rptest/tests/recovery_mode_test.py +++ b/tests/rptest/tests/recovery_mode_test.py @@ -11,6 +11,7 @@ import dataclasses from ducktape.utils.util import wait_until +from requests.exceptions import HTTPError from rptest.tests.redpanda_test import RedpandaTest from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST @@ -21,6 +22,28 @@ from rptest.util import wait_until_result +def _produce(test, topic, msg_count=1000, partition=None): + producer = RpkProducer(context=test.test_context, + redpanda=test.redpanda, + topic=topic, + msg_size=4096, + msg_count=msg_count, + partition=partition) + try: + producer.run() + finally: + producer.free() + + +def assert_rpk_fails(cmd, error_msg): + try: + cmd() + except RpkException: + pass + else: + assert False, error_msg + + class RecoveryModeTest(RedpandaTest): def __init__(self, *args, **kwargs): super().__init__(*args, num_brokers=4, **kwargs) @@ -29,17 +52,6 @@ def setUp(self): # start the nodes manually pass - def _produce(self, topic): - producer = RpkProducer(context=self.test_context, - redpanda=self.redpanda, - topic=topic, - msg_size=4096, - msg_count=1000) - try: - producer.run() - finally: - producer.free() - @cluster(num_nodes=5) def test_recovery_mode(self): """Test that after restarting the cluster in recovery mode, produce/consume is forbidden, @@ -62,7 +74,7 @@ def test_recovery_mode(self): rpk.create_topic("mytopic1", partitions=5, replicas=3) rpk.create_topic("mytopic2", partitions=5, replicas=3) - self._produce("mytopic1") + _produce(self, "mytopic1") partitions = list(rpk.describe_topic("mytopic1", tolerant=False)) assert len(partitions) == 5 @@ -91,32 +103,19 @@ def test_recovery_mode(self): assert len(partitions) == 5 assert all(p.load_error is not None for p in partitions) - try: - rpk.produce('mytopic1', 'key', 'msg') - except RpkException: - pass - else: - assert False, "producing should fail" + assert_rpk_fails(lambda: rpk.produce('mytopic1', 'key', 'msg'), + "producing should fail") - try: - rpk.consume('mytopic1', n=1000, quiet=True, timeout=10) - # rpk will retry indefinitely even in the presence of non-retryable errors, - # so just wait for the timeout to occur. - except RpkException: - pass - else: - assert False, "consuming should fail" + # rpk will retry indefinitely even in the presence of non-retryable errors, + # so just wait for the timeout to occur. + assert_rpk_fails( + lambda: rpk.consume('mytopic1', n=1000, quiet=True, timeout=10), + "consuming should fail") - try: - rpk.consume('mytopic1', - n=1000, - group='mygroup3', - quiet=True, - timeout=10) - except RpkException: - pass - else: - assert False, "group consuming should fail" + assert_rpk_fails( + lambda: rpk.consume( + 'mytopic1', n=1000, group='mygroup3', quiet=True, timeout=10), + "group consuming should fail") # check consumer group ops @@ -177,7 +176,7 @@ def test_recovery_mode(self): # check that produce and consume work - self._produce("mytopic1") + _produce(self, "mytopic1") def partitions_ready(): partitions = list(rpk.describe_topic("mytopic1", tolerant=False)) @@ -200,7 +199,7 @@ def partitions_ready(): assert len(consumed) == 2000 -@dataclasses.dataclass +@dataclasses.dataclass(frozen=True) class PartitionInfo: ns: str topic: str @@ -242,6 +241,13 @@ def test_apis(self): rpk = RpkTool(self.redpanda) admin = Admin(self.redpanda) + try: + admin.set_partitions_disabled(ns="redpanda", topic="controller") + except HTTPError as e: + assert e.response.status_code == 400 + else: + assert False, "disabling internal topics should fail" + topics = ["mytopic1", "mytopic2", "mytopic3", "mytopic4"] for topic in topics: rpk.create_topic(topic, partitions=3, replicas=3) @@ -352,3 +358,162 @@ def check_everything(): self.redpanda.wait_for_membership(first_start=False) check_everything() + + @cluster(num_nodes=5) + def test_disable(self): + """ + Test that disabled partitions are shut down and that producing/consuming + to them errors out, while producing/consuming to other partitions is + still possible. + """ + + rpk = RpkTool(self.redpanda) + admin = Admin(self.redpanda) + + topics = ["mytopic1", "mytopic2"] + for topic in topics: + rpk.create_topic(topic, partitions=2, replicas=3) + + def get_node_partitions(node): + return [ + f'{p["topic"]}/{p["partition_id"]}' + for p in admin.get_partitions(node=node) + if p["topic"].startswith("mytopic") + ] + + def get_partition_counts(): + ret = dict() + for n in self.redpanda.nodes: + for p in get_node_partitions(n): + ret[p] = ret.setdefault(p, 0) + 1 + self.logger.debug(f"partition counts: {ret}") + return ret + + def all_created(): + pc = get_partition_counts() + return len(pc) == 4 and all(c == 3 for c in pc.values()) + + wait_until(all_created, + timeout_sec=30, + backoff_sec=1, + err_msg="failed to wait until all partitions are created") + + _produce(self, "mytopic1") + _produce(self, "mytopic2") + + def get_hwms(topic=None): + ts = topics if topic is None else [topic] + ret = dict() + for topic in ts: + for p in rpk.describe_topic(topic): + ret[f"{topic}/{p.id}"] = p.high_watermark + return ret + + orig_hwms = get_hwms() + assert len(orig_hwms) == 4 + for topic in topics: + assert sum(hwm for ntp, hwm in orig_hwms.items() + if ntp.startswith(topic)) == 1000 + + self.logger.info("disabling partitions") + + admin.set_partitions_disabled(ns="kafka", + topic="mytopic1", + partition=1) + admin.set_partitions_disabled(ns="kafka", topic="mytopic2") + + def all_disabled_shut_down(): + pc = get_partition_counts() + return set(pc.keys()) == {"mytopic1/0"} and all( + c == 3 for c in pc.values()) + + wait_until( + all_disabled_shut_down, + timeout_sec=30, + backoff_sec=1, + err_msg="failed to wait until all disabled partitions are shut down" + ) + + hwms2 = get_hwms() + assert len(hwms2) == 1 + assert hwms2["mytopic1/0"] == orig_hwms["mytopic1/0"] + + # test that producing to disabled partitions fails, while producing to + # the remaining partition is still possible + + _produce(self, "mytopic1", partition=0) + + assert_rpk_fails( + lambda: rpk.produce('mytopic1', 'key', 'msg', partition=1), + "producing should fail") + + assert_rpk_fails(lambda: rpk.produce('mytopic2', 'key', 'msg'), + "producing should fail") + + hwms3 = get_hwms() + assert len(hwms2) == 1 + assert hwms3["mytopic1/0"] == hwms2["mytopic1/0"] + 1000 + + # the same with consuming + + assert len( + rpk.consume( + 'mytopic1', n=hwms3["mytopic1/0"], + quiet=True).rstrip().split('\n')) == hwms3["mytopic1/0"] + + assert_rpk_fails( + lambda: rpk.consume( + 'mytopic1', n=hwms3["mytopic1/0"] + 1, timeout=10), + "consuming should fail") + + assert_rpk_fails(lambda: rpk.consume('mytopic2', n=1, timeout=10), + "consuming should fail") + + self.logger.info("restarting cluster") + + self.redpanda.restart_nodes(self.redpanda.nodes) + self.redpanda.wait_for_membership(first_start=False) + + # test that partitions are still disabled after restart + + assert all_disabled_shut_down() + + assert get_hwms() == hwms3 + + self.logger.info("enabling partitions back") + + admin.set_partitions_disabled(ns="kafka", + topic="mytopic1", + value=False) + admin.set_partitions_disabled(ns="kafka", + topic="mytopic2", + value=False) + + wait_until(all_created, + timeout_sec=30, + backoff_sec=1, + err_msg="failed to wait until all partitions are created") + + wait_until(lambda: set(get_hwms().keys()) == set(orig_hwms.keys()), + timeout_sec=30, + backoff_sec=1, + err_msg="failed to wait until all leaders elected") + + # test that producing and consuming works after re-enabling all partitions + + hwms4 = get_hwms() + assert set(hwms4.keys()) == set(orig_hwms.keys()) + for ntp, hwm in hwms4.items(): + if ntp == "mytopic1/0": + assert hwm == orig_hwms[ntp] + 1000 + else: + assert hwm == orig_hwms[ntp] + + _produce(self, "mytopic1") + _produce(self, "mytopic2") + + assert sum(hwm for hwm in get_hwms("mytopic1").values()) == 3000 + assert sum(hwm for hwm in get_hwms("mytopic2").values()) == 2000 + + rpk.consume('mytopic1', n=3000, quiet=True) + rpk.consume('mytopic2', n=2000, quiet=True)