Skip to content

Commit

Permalink
cluster: add abort source to partition manager
Browse files Browse the repository at this point in the history
...and use it in remote_partition::erase.  This is necessary
because we now require a usable abourt source in all cloud
storage paths, and the partition's abort source is already
fired once we get to removing the persistent state.
  • Loading branch information
jcsp committed Jan 17, 2023
1 parent 632676c commit c10e266
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 9 deletions.
8 changes: 6 additions & 2 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ ss::future<bool> remote_partition::tolerant_delete_object(
* S3, not the state of the archival metadata stm (which can be out
* of date if e.g. we were not the leader)
*/
ss::future<> remote_partition::erase() {
ss::future<> remote_partition::erase(ss::abort_source& as) {
// TODO: Edge case 1
// There is a rare race in which objects might get left behind in S3.
//
Expand All @@ -774,7 +774,11 @@ ss::future<> remote_partition::erase() {

static constexpr ss::lowres_clock::duration erase_timeout = 60s;
static constexpr ss::lowres_clock::duration erase_backoff = 1s;
retry_chain_node local_rtc(erase_timeout, erase_backoff, &_rtc);

// This function is called after ::stop, so we may not use our
// main retry_chain_node which is bound to our abort source,
// and construct a special one.
retry_chain_node local_rtc(as, erase_timeout, erase_backoff);

// Read the partition manifest fresh: we might already have
// dropped local archival_stm state related to this partition.
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_storage/remote_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class remote_partition
retry_chain_node& parent);

/// Remove objects from S3
ss::future<> erase();
ss::future<> erase(ss::abort_source&);

/// Hook for materialized_segment to notify us when a segment is evicted
void offload_segment(model::offset);
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ ss::future<> partition::remove_persistent_state() {
}
}

ss::future<> partition::remove_remote_persistent_state() {
ss::future<> partition::remove_remote_persistent_state(ss::abort_source& as) {
// Backward compatibility: even if remote.delete is true, only do
// deletion if the partition is in full tiered storage mode (this
// excludes read replica clusters from deleting data in S3)
Expand All @@ -527,7 +527,7 @@ ss::future<> partition::remove_remote_persistent_state() {
get_ntp_config(),
get_ntp_config().is_archival_enabled(),
get_ntp_config().is_read_replica_mode_enabled());
co_await _cloud_storage_partition->erase();
co_await _cloud_storage_partition->erase(as);
} else {
vlog(
clusterlog.info, "Leaving S3 objects behind for partition {}", ntp());
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class partition {
std::optional<model::timeout_clock::time_point> deadline = std::nullopt);

ss::future<> remove_persistent_state();
ss::future<> remove_remote_persistent_state();
ss::future<> remove_remote_persistent_state(ss::abort_source& as);

std::optional<model::offset> get_term_last_offset(model::term_id) const;

Expand Down
7 changes: 4 additions & 3 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ partition_manager::maybe_download_log(
}

ss::future<> partition_manager::stop_partitions() {
_as.request_abort();

_raft_manager.local().unregister_leadership_notification(
*_leader_notify_handle);
_leader_notify_handle.reset();
Expand Down Expand Up @@ -287,9 +289,9 @@ partition_manager::remove(const model::ntp& ntp, partition_removal_mode mode) {
.then([partition] { return partition->stop(); })
.then([partition] { return partition->remove_persistent_state(); })
.then([this, ntp] { return _storage.log_mgr().remove(ntp); })
.then([partition, mode] {
.then([this, partition, mode] {
if (mode == partition_removal_mode::global) {
return partition->remove_remote_persistent_state();
return partition->remove_remote_persistent_state(_as);
} else {
return ss::now();
}
Expand All @@ -299,7 +301,6 @@ partition_manager::remove(const model::ntp& ntp, partition_removal_mode mode) {

ss::future<> partition_manager::shutdown(const model::ntp& ntp) {
auto partition = get(ntp);

if (!partition) {
return ss::make_exception_future<>(std::invalid_argument(fmt::format(
"Can not shutdown partition. NTP {} is not present in partition "
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ class partition_manager {
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<cluster::tm_stm_cache>& _tm_stm_cache;
ss::gate _gate;

// In general, all our background work is in partition objects which
// have their own abort source. This abort source is only for work that
// happens after partition stop, during deletion.
ss::abort_source _as;

bool _block_new_leadership{false};

config::binding<uint64_t> _max_concurrent_producer_ids;
Expand Down

0 comments on commit c10e266

Please sign in to comment.