Skip to content

Commit

Permalink
cluster: best-effort quiesce of archiver on leadership transfer
Browse files Browse the repository at this point in the history
This substantially reduces the probability of leaving orphaned
objects in the object store when partitions change leadership
under load, e.g. during upgrades or leader balancing.

This fixes a test failure that indirectly detects orphan
objects by checking that topic deletion clears all objects.

Fixes redpanda-data#8496
  • Loading branch information
jcsp committed Feb 15, 2023
1 parent b5d9395 commit 86cbfee
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
15 changes: 10 additions & 5 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,13 @@ ss::future<> ntp_archiver::upload_topic_manifest() {
ss::future<> ntp_archiver::upload_until_term_change() {
ss::lowres_clock::duration backoff = _conf->upload_loop_initial_backoff;

// Hold sempahore units to enable other code to know that we are in
// the process of doing uploads + wait for us to drop out if they
// e.g. set _paused.
auto units = co_await ss::get_units(_uploads_active, 1);

while (can_update_archival_metadata()) {
// Hold sempahore units to enable other code to know that we are in
// the process of doing uploads + wait for us to drop out if they
// e.g. set _paused.
vassert(!_paused, "can_update_archival_metadata must ensure !_paused");
auto units = co_await ss::get_units(_uploads_active, 1);

// Bump up archival STM's state to make sure that it's not lagging
// behind too far. If the STM is lagging behind we will have to read a
// lot of data next time we upload something.
Expand Down Expand Up @@ -392,6 +393,10 @@ ss::future<> ntp_archiver::upload_until_term_change() {

update_probe();

// Drop _uploads_active lock: we are not considered active while
// sleeping for backoff at the end of the loop.
units.return_all();

if (non_compacted_upload_result.num_succeeded == 0) {
// The backoff algorithm here is used to prevent high CPU
// utilization when redpanda is not receiving any data and there
Expand Down
28 changes: 27 additions & 1 deletion src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "prometheus/prometheus_sanitize.h"
#include "raft/types.h"

#include <seastar/util/defer.hh>

namespace cluster {

static bool is_id_allocator_topic(model::ntp ntp) {
Expand Down Expand Up @@ -621,7 +623,8 @@ ss::future<std::error_code>
partition::transfer_leadership(std::optional<model::node_id> target) {
vlog(
clusterlog.debug,
"Transferring leadership to {}",
"Transferring {} leadership to {}",
ntp(),
target.value_or(model::node_id{-1}));

// Some state machines need a preparatory phase to efficiently transfer
Expand All @@ -633,6 +636,29 @@ partition::transfer_leadership(std::optional<model::node_id> target) {
} else if (_tm_stm) {
stm_prepare_lock = co_await _tm_stm->prepare_transfer_leadership();
}

std::optional<ss::deferred_action<std::function<void()>>> complete_archiver;
auto archival_timeout
= config::shard_local_cfg().cloud_storage_graceful_transfer_timeout_ms();
if (_archiver && archival_timeout.has_value()) {
complete_archiver.emplace(
[a = _archiver.get()]() { a->complete_transfer_leadership(); });
bool archiver_clean = co_await _archiver->prepare_transfer_leadership(
archival_timeout.value());
if (!archiver_clean) {
// This is legal: if we are very tight on bandwidth to S3, then it
// can take longer than the available timeout for an upload of
// a large segment to complete. If this happens, we will leak
// an object, but retain a consistent+correct manifest when
// the new leader writes it.
vlog(
clusterlog.warn,
"Timed out waiting for {} uploads to complete before "
"transferring leadership: proceeding anyway",
ntp());
}
}

co_return co_await _raft->do_transfer_leadership(target);
}

Expand Down

0 comments on commit 86cbfee

Please sign in to comment.