Skip to content

Commit

Permalink
utils: eliminate default constructor for retry_chain_node
Browse files Browse the repository at this point in the history
Constructing a retry_chain_node without an abort source
never makes sense: it is always used in a context where
it should be abortable.
  • Loading branch information
jcsp committed Jan 17, 2023
1 parent 7aa73d1 commit 632676c
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ uint64_t ntp_archiver::estimate_backlog_size() {

ss::future<std::optional<cloud_storage::partition_manifest>>
ntp_archiver::maybe_truncate_manifest() {
retry_chain_node rtc;
retry_chain_node rtc(_as);
ss::gate::holder gh(_gate);
retry_chain_logger ctxlog(archival_log, rtc, _ntp.path());
vlog(ctxlog.info, "archival metadata cleanup started");
Expand Down
10 changes: 6 additions & 4 deletions src/v/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ using namespace archival;

inline ss::logger test_log("test"); // NOLINT

static ss::abort_source never_abort;

static const auto manifest_namespace = model::ns("kafka"); // NOLINT
static const auto manifest_topic = model::topic("test-topic"); // NOLINT
static const auto manifest_partition = model::partition_id(42); // NOLINT
Expand Down Expand Up @@ -118,7 +120,7 @@ FIXTURE_TEST(test_upload_segments, archiver_fixture) {
archival::ntp_archiver archiver(get_ntp_conf(), arch_conf, remote, *part);
auto action = ss::defer([&archiver] { archiver.stop().get(); });

retry_chain_node fib;
retry_chain_node fib(never_abort);
auto res = upload_next_with_retries(archiver).get0();

auto non_compacted_result = res.non_compacted_upload_result;
Expand Down Expand Up @@ -235,7 +237,7 @@ FIXTURE_TEST(test_retention, archiver_fixture) {
archival::ntp_archiver archiver(get_ntp_conf(), arch_conf, remote, *part);
auto action = ss::defer([&archiver] { archiver.stop().get(); });

retry_chain_node fib;
retry_chain_node fib(never_abort);
auto res = upload_next_with_retries(archiver).get0();
BOOST_REQUIRE_EQUAL(res.non_compacted_upload_result.num_succeeded, 4);
BOOST_REQUIRE_EQUAL(res.non_compacted_upload_result.num_failed, 0);
Expand Down Expand Up @@ -714,7 +716,7 @@ FIXTURE_TEST(test_upload_segments_leadership_transfer, archiver_fixture) {
archival::ntp_archiver archiver(get_ntp_conf(), arch_conf, remote, *part);
auto action = ss::defer([&archiver] { archiver.stop().get(); });

retry_chain_node fib;
retry_chain_node fib(never_abort);

auto res = upload_next_with_retries(archiver).get0();

Expand Down Expand Up @@ -926,7 +928,7 @@ static void test_partial_upload_impl(
archival::ntp_archiver archiver(get_ntp_conf(), aconf, remote, *part);
auto action = ss::defer([&archiver] { archiver.stop().get(); });

retry_chain_node fib;
retry_chain_node fib(never_abort);
part->stop_archiver().get();
test.listen();
auto res = upload_next_with_retries(archiver, lso).get0();
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/partition_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class missing_partition_exception final : public std::exception {
partition_recovery_manager::partition_recovery_manager(
cloud_storage_clients::bucket_name bucket, ss::sharded<remote>& remote)
: _bucket(std::move(bucket))
, _remote(remote) {}
, _remote(remote)
, _root(_as) {}

partition_recovery_manager::~partition_recovery_manager() {
vassert(_gate.is_closed(), "S3 downloader is not stopped properly");
Expand Down
6 changes: 4 additions & 2 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ class partition_record_batch_reader_impl final
const storage::log_reader_config& config,
ss::shared_ptr<remote_partition> part,
ss::lw_shared_ptr<storage::offset_translator_state> ot_state) noexcept
: _ctxlog(cst_log, _rtc, part->get_ntp().path())
: _rtc(part->_as)
, _ctxlog(cst_log, _rtc, part->get_ntp().path())
, _partition(std::move(part))
, _ot_state(std::move(ot_state))
, _gate_guard(_partition->_gate) {
Expand Down Expand Up @@ -471,7 +472,7 @@ remote_partition::remote_partition(
remote& api,
cache& c,
cloud_storage_clients::bucket_name bucket)
: _rtc()
: _rtc(_as)
, _ctxlog(cst_log, _rtc, m.get_ntp().path())
, _api(api)
, _cache(c)
Expand Down Expand Up @@ -580,6 +581,7 @@ remote_partition::aborted_transactions(offset_range offsets) {
ss::future<> remote_partition::stop() {
vlog(_ctxlog.debug, "remote partition stop {} segments", _segments.size());

_as.request_abort();
co_await _gate.close();
// Remove materialized_segment_state from the list that contains it, to
// avoid it getting registered for eviction and stop.
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/remote_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class remote_partition
retry_chain_node _rtc;
retry_chain_logger _ctxlog;
ss::gate _gate;
ss::abort_source _as;
remote& _api;
cache& _cache;
const partition_manifest& _manifest;
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/offset_translation_layer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ inline ss::input_stream<char> make_manifest_stream(std::string_view json) {
}

SEASTAR_THREAD_TEST_CASE(test_name_translation) {
retry_chain_node fib;
ss::abort_source never_abort;
retry_chain_node fib(never_abort);
partition_manifest m;
m.update(make_manifest_stream(serialized_manifest)).get0();
BOOST_REQUIRE_EQUAL(m.size(), 3);
Expand Down
6 changes: 0 additions & 6 deletions src/v/utils/retry_chain_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ static constexpr size_t max_retry_chain_depth = 8;
static constexpr uint16_t max_retry_count = std::numeric_limits<uint16_t>::max()
- 1;

retry_chain_node::retry_chain_node()
: _id(fiber_count++) // generate new head id
, _backoff{0}
, _deadline{ss::lowres_clock::time_point::min()}
, _parent() {}

retry_chain_node::retry_chain_node(ss::abort_source& as)
: _id(fiber_count++) // generate new head id
, _backoff{0}
Expand Down
5 changes: 3 additions & 2 deletions src/v/utils/retry_chain_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ class retry_chain_node {
using milliseconds_uint16_t
= std::chrono::duration<uint16_t, std::chrono::milliseconds::period>;

/// Create a head of the chain without backoff
retry_chain_node();
// No default constructor: we always need an abort source.
retry_chain_node() = delete;

/// Create a head of the chain without backoff but with abort_source
explicit retry_chain_node(ss::abort_source& as);
/// Creates a head with the provided abort_source, deadline, and
Expand Down

0 comments on commit 632676c

Please sign in to comment.