diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 191c00fc7b52..deade6272a09 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -1104,7 +1104,7 @@ uint64_t ntp_archiver::estimate_backlog_size() { ss::future> ntp_archiver::maybe_truncate_manifest() { - retry_chain_node rtc; + retry_chain_node rtc(_as); ss::gate::holder gh(_gate); retry_chain_logger ctxlog(archival_log, rtc, _ntp.path()); vlog(ctxlog.info, "archival metadata cleanup started"); diff --git a/src/v/archival/tests/ntp_archiver_test.cc b/src/v/archival/tests/ntp_archiver_test.cc index cade49a96b9b..78bb9d55c416 100644 --- a/src/v/archival/tests/ntp_archiver_test.cc +++ b/src/v/archival/tests/ntp_archiver_test.cc @@ -35,6 +35,8 @@ using namespace archival; inline ss::logger test_log("test"); // NOLINT +static ss::abort_source never_abort; + static const auto manifest_namespace = model::ns("kafka"); // NOLINT static const auto manifest_topic = model::topic("test-topic"); // NOLINT static const auto manifest_partition = model::partition_id(42); // NOLINT @@ -118,7 +120,7 @@ FIXTURE_TEST(test_upload_segments, archiver_fixture) { archival::ntp_archiver archiver(get_ntp_conf(), arch_conf, remote, *part); auto action = ss::defer([&archiver] { archiver.stop().get(); }); - retry_chain_node fib; + retry_chain_node fib(never_abort); auto res = upload_next_with_retries(archiver).get0(); auto non_compacted_result = res.non_compacted_upload_result; @@ -235,7 +237,7 @@ FIXTURE_TEST(test_retention, archiver_fixture) { archival::ntp_archiver archiver(get_ntp_conf(), arch_conf, remote, *part); auto action = ss::defer([&archiver] { archiver.stop().get(); }); - retry_chain_node fib; + retry_chain_node fib(never_abort); auto res = upload_next_with_retries(archiver).get0(); BOOST_REQUIRE_EQUAL(res.non_compacted_upload_result.num_succeeded, 4); BOOST_REQUIRE_EQUAL(res.non_compacted_upload_result.num_failed, 0); @@ -714,7 +716,7 @@ FIXTURE_TEST(test_upload_segments_leadership_transfer, archiver_fixture) { archival::ntp_archiver archiver(get_ntp_conf(), arch_conf, remote, *part); auto action = ss::defer([&archiver] { archiver.stop().get(); }); - retry_chain_node fib; + retry_chain_node fib(never_abort); auto res = upload_next_with_retries(archiver).get0(); @@ -926,7 +928,7 @@ static void test_partial_upload_impl( archival::ntp_archiver archiver(get_ntp_conf(), aconf, remote, *part); auto action = ss::defer([&archiver] { archiver.stop().get(); }); - retry_chain_node fib; + retry_chain_node fib(never_abort); part->stop_archiver().get(); test.listen(); auto res = upload_next_with_retries(archiver, lso).get0(); diff --git a/src/v/cloud_storage/partition_recovery_manager.cc b/src/v/cloud_storage/partition_recovery_manager.cc index 6f83cb5f4317..d66542fcffc4 100644 --- a/src/v/cloud_storage/partition_recovery_manager.cc +++ b/src/v/cloud_storage/partition_recovery_manager.cc @@ -66,7 +66,8 @@ class missing_partition_exception final : public std::exception { partition_recovery_manager::partition_recovery_manager( cloud_storage_clients::bucket_name bucket, ss::sharded& remote) : _bucket(std::move(bucket)) - , _remote(remote) {} + , _remote(remote) + , _root(_as) {} partition_recovery_manager::~partition_recovery_manager() { vassert(_gate.is_closed(), "S3 downloader is not stopped properly"); diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index 5bc8b188895c..1bee0958f697 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -176,7 +176,8 @@ class partition_record_batch_reader_impl final const storage::log_reader_config& config, ss::shared_ptr part, ss::lw_shared_ptr ot_state) noexcept - : _ctxlog(cst_log, _rtc, part->get_ntp().path()) + : _rtc(part->_as) + , _ctxlog(cst_log, _rtc, part->get_ntp().path()) , _partition(std::move(part)) , _ot_state(std::move(ot_state)) , _gate_guard(_partition->_gate) { @@ -471,7 +472,7 @@ remote_partition::remote_partition( remote& api, cache& c, cloud_storage_clients::bucket_name bucket) - : _rtc() + : _rtc(_as) , _ctxlog(cst_log, _rtc, m.get_ntp().path()) , _api(api) , _cache(c) @@ -580,6 +581,7 @@ remote_partition::aborted_transactions(offset_range offsets) { ss::future<> remote_partition::stop() { vlog(_ctxlog.debug, "remote partition stop {} segments", _segments.size()); + _as.request_abort(); co_await _gate.close(); // Remove materialized_segment_state from the list that contains it, to // avoid it getting registered for eviction and stop. diff --git a/src/v/cloud_storage/remote_partition.h b/src/v/cloud_storage/remote_partition.h index 6ef19d1bbc5e..753ae702dd29 100644 --- a/src/v/cloud_storage/remote_partition.h +++ b/src/v/cloud_storage/remote_partition.h @@ -178,6 +178,7 @@ class remote_partition retry_chain_node _rtc; retry_chain_logger _ctxlog; ss::gate _gate; + ss::abort_source _as; remote& _api; cache& _cache; const partition_manifest& _manifest; diff --git a/src/v/cloud_storage/tests/offset_translation_layer_test.cc b/src/v/cloud_storage/tests/offset_translation_layer_test.cc index cc673645e464..667aab1affd0 100644 --- a/src/v/cloud_storage/tests/offset_translation_layer_test.cc +++ b/src/v/cloud_storage/tests/offset_translation_layer_test.cc @@ -77,7 +77,8 @@ inline ss::input_stream make_manifest_stream(std::string_view json) { } SEASTAR_THREAD_TEST_CASE(test_name_translation) { - retry_chain_node fib; + ss::abort_source never_abort; + retry_chain_node fib(never_abort); partition_manifest m; m.update(make_manifest_stream(serialized_manifest)).get0(); BOOST_REQUIRE_EQUAL(m.size(), 3); diff --git a/src/v/utils/retry_chain_node.cc b/src/v/utils/retry_chain_node.cc index 2e753bdd1f6b..b5a26c56b1c8 100644 --- a/src/v/utils/retry_chain_node.cc +++ b/src/v/utils/retry_chain_node.cc @@ -33,12 +33,6 @@ static constexpr size_t max_retry_chain_depth = 8; static constexpr uint16_t max_retry_count = std::numeric_limits::max() - 1; -retry_chain_node::retry_chain_node() - : _id(fiber_count++) // generate new head id - , _backoff{0} - , _deadline{ss::lowres_clock::time_point::min()} - , _parent() {} - retry_chain_node::retry_chain_node(ss::abort_source& as) : _id(fiber_count++) // generate new head id , _backoff{0} diff --git a/src/v/utils/retry_chain_node.h b/src/v/utils/retry_chain_node.h index 003faeeda110..b604a3c19995 100644 --- a/src/v/utils/retry_chain_node.h +++ b/src/v/utils/retry_chain_node.h @@ -200,8 +200,9 @@ class retry_chain_node { using milliseconds_uint16_t = std::chrono::duration; - /// Create a head of the chain without backoff - retry_chain_node(); + // No default constructor: we always need an abort source. + retry_chain_node() = delete; + /// Create a head of the chain without backoff but with abort_source explicit retry_chain_node(ss::abort_source& as); /// Creates a head with the provided abort_source, deadline, and