Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v22.3.x] archival: remove timeouts from archival_metadata_stm #12690

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ void ntp_archiver::run_sync_manifest_loop() {
ssx::spawn_with_gate(_gate, [this] {
return sync_manifest_loop()
.handle_exception_type([](const ss::abort_requested_exception&) {})
.handle_exception_type([](const ss::broken_semaphore&) {})
.handle_exception_type([](const ss::broken_named_semaphore&) {})
.handle_exception_type([](const ss::sleep_aborted&) {})
.handle_exception_type([](const ss::gate_closed_exception&) {})
.handle_exception_type([this](const ss::semaphore_timed_out& e) {
Expand Down Expand Up @@ -170,6 +172,8 @@ void ntp_archiver::run_upload_loop() {
.handle_exception_type([](const ss::abort_requested_exception&) {})
.handle_exception_type([](const ss::sleep_aborted&) {})
.handle_exception_type([](const ss::gate_closed_exception&) {})
.handle_exception_type([](const ss::broken_semaphore&) {})
.handle_exception_type([](const ss::broken_named_semaphore&) {})
.handle_exception_type([this](const ss::semaphore_timed_out& e) {
vlog(
_rtclog.warn,
Expand Down
54 changes: 27 additions & 27 deletions src/v/cluster/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ ss::future<std::error_code> command_batch_builder::replicate() {
return ss::make_ready_future<std::error_code>(errc::timeout);
}
auto batch = std::move(_builder).build();
return _stm.get().do_replicate_commands(
std::move(batch), _deadline, _as);
return _stm.get().do_replicate_commands(std::move(batch), _as);
});
});
}
Expand Down Expand Up @@ -303,45 +302,46 @@ ss::future<std::error_code> archival_metadata_stm::cleanup_metadata(

ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
model::record_batch batch,
ss::lowres_clock::time_point deadline,
std::optional<std::reference_wrapper<ss::abort_source>> as) {
auto current_term = _insync_term;
auto fut = _raft->replicate(
_insync_term,
current_term,
model::make_memory_record_batch_reader(std::move(batch)),
raft::replicate_options{raft::consistency_level::quorum_ack});

// Run with an abort source so shutdown doesn't have to wait a full
// replication timeout to proceed.
if (as) {
fut = ssx::with_timeout_abortable(std::move(fut), deadline, *as);
} else {
fut = ss::with_timeout(deadline, std::move(fut));
}

result<raft::replicate_result> result{{}};
try {
result = co_await std::move(fut);
} catch (const ss::timed_out_error&) {
result = errc::timeout;
fut = ssx::with_timeout_abortable(
std::move(fut), model::no_timeout, *as);
}

auto result = co_await std::move(fut);
if (!result) {
vlog(
_logger.warn,
"error on replicating remote segment metadata: {}",
result.error());
co_return result.error();
}

bool applied = false;
{
auto now = ss::lowres_clock::now();
if (now >= deadline) {
co_return errc::replication_error;
// If there was an error for whatever reason, it is unsafe to make any
// assumptions about whether batches were replicated or not. Explicitly
// step down if we're still leader and force callers to re-sync in a
// new term with a new leader.
if (_c->is_leader() && _c->term() == current_term) {
co_await _c->step_down(ssx::sformat(
"failed to replicate archival batch in term {}", current_term));
}
auto timeout = deadline - now;
applied = co_await wait_no_throw(result.value().last_offset, timeout);
co_return result.error();
}

auto applied = co_await wait_no_throw(
result.value().last_offset, model::no_timeout, as);
if (!applied) {
if (
as.has_value() && !as.value().get().abort_requested()
&& _c->is_leader() && _c->term() == current_term) {
co_await _c->step_down(ssx::sformat(
"failed to replicate archival batch in term {}", current_term));
}
co_return errc::replication_error;
}

Expand Down Expand Up @@ -383,7 +383,7 @@ ss::future<std::error_code> archival_metadata_stm::do_truncate(

auto batch = std::move(b).build();

auto ec = co_await do_replicate_commands(std::move(batch), deadline, as);
auto ec = co_await do_replicate_commands(std::move(batch), as);
if (ec) {
co_return ec;
}
Expand Down Expand Up @@ -439,7 +439,7 @@ ss::future<std::error_code> archival_metadata_stm::do_cleanup_metadata(

auto batch = std::move(b).build();

auto ec = co_await do_replicate_commands(std::move(batch), deadline, as);
auto ec = co_await do_replicate_commands(std::move(batch), as);
if (ec) {
co_return ec;
}
Expand Down Expand Up @@ -494,7 +494,7 @@ ss::future<std::error_code> archival_metadata_stm::do_add_segments(
}

auto batch = std::move(b).build();
auto ec = co_await do_replicate_commands(std::move(batch), deadline, as);
auto ec = co_await do_replicate_commands(std::move(batch), as);
if (ec) {
co_return ec;
}
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ class archival_metadata_stm final : public persisted_stm {
ss::lowres_clock::time_point,
std::optional<std::reference_wrapper<ss::abort_source>>);

/// NOTE: no deadline provided, as it is expected further updates to the
/// archiver will depend on all record batches having been applied.
ss::future<std::error_code> do_replicate_commands(
model::record_batch,
ss::lowres_clock::time_point,
std::optional<std::reference_wrapper<ss::abort_source>>);

ss::future<> apply(model::record_batch batch) override;
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ ss::future<bool> id_allocator_stm::set_state(
co_return false;
}
if (!co_await wait_no_throw(
model::offset(r.value().last_offset()), timeout)) {
model::offset(r.value().last_offset()),
model::timeout_clock::now() + timeout)) {
co_return false;
}
co_return true;
Expand Down
8 changes: 5 additions & 3 deletions src/v/cluster/persisted_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "storage/record_batch_builder.h"
#include "storage/snapshot.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>

Expand Down Expand Up @@ -313,9 +314,10 @@ ss::future<bool> persisted_stm::sync(model::timeout_clock::duration timeout) {
}

ss::future<bool> persisted_stm::wait_no_throw(
model::offset offset, model::timeout_clock::duration timeout) {
auto deadline = model::timeout_clock::now() + timeout;
return wait(offset, deadline)
model::offset offset,
model::timeout_clock::time_point deadline,
std::optional<std::reference_wrapper<ss::abort_source>> as) {
return wait(offset, deadline, as)
.then([] { return true; })
.handle_exception_type([](const ss::abort_requested_exception&) {
// Shutting down
Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/persisted_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,10 @@ class persisted_stm
*/
ss::future<> start() override;

ss::future<bool>
wait_no_throw(model::offset offset, model::timeout_clock::duration);
ss::future<bool> wait_no_throw(
model::offset offset,
model::timeout_clock::time_point,
std::optional<std::reference_wrapper<ss::abort_source>> = std::nullopt);

private:
ss::future<> wait_offset_committed(
Expand Down
31 changes: 21 additions & 10 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ ss::future<checked<model::term_id, tx_errc>> rm_stm::do_begin_tx(
}

if (!co_await wait_no_throw(
model::offset(r.value().last_offset()), _sync_timeout)) {
model::offset(r.value().last_offset()),
model::timeout_clock::now() + _sync_timeout)) {
vlog(
_ctx_log.trace,
"timeout on waiting until {} is applied (begin_tx pid:{} tx_seq:{})",
Expand Down Expand Up @@ -638,7 +639,8 @@ ss::future<tx_errc> rm_stm::do_prepare_tx(
}

if (!co_await wait_no_throw(
model::offset(r.value().last_offset()), timeout)) {
model::offset(r.value().last_offset()),
model::timeout_clock::now() + timeout)) {
if (_c->is_leader() && _c->term() == synced_term) {
co_await _c->step_down("prepare_tx apply error");
}
Expand Down Expand Up @@ -680,7 +682,8 @@ ss::future<tx_errc> rm_stm::do_commit_tx(
// catching up with all previous end_tx operations (commit | abort)
// to avoid writing the same commit | abort marker twice
if (_mem_state.last_end_tx >= model::offset{0}) {
if (!co_await wait_no_throw(_mem_state.last_end_tx, timeout)) {
if (!co_await wait_no_throw(
_mem_state.last_end_tx, model::timeout_clock::now() + timeout)) {
co_return tx_errc::stale;
}
}
Expand Down Expand Up @@ -806,7 +809,8 @@ ss::future<tx_errc> rm_stm::do_commit_tx(
}
co_return tx_errc::timeout;
}
if (!co_await wait_no_throw(r.value().last_offset, timeout)) {
if (!co_await wait_no_throw(
r.value().last_offset, model::timeout_clock::now() + timeout)) {
if (_c->is_leader() && _c->term() == synced_term) {
co_await _c->step_down("do_commit_tx wait error");
}
Expand Down Expand Up @@ -929,7 +933,8 @@ ss::future<tx_errc> rm_stm::do_abort_tx(
// catching up with all previous end_tx operations (commit | abort)
// to avoid writing the same commit | abort marker twice
if (_mem_state.last_end_tx >= model::offset{0}) {
if (!co_await wait_no_throw(_mem_state.last_end_tx, timeout)) {
if (!co_await wait_no_throw(
_mem_state.last_end_tx, model::timeout_clock::now() + timeout)) {
vlog(
_ctx_log.trace,
"Can't catch up to abort pid:{} tx_seq:{}",
Expand Down Expand Up @@ -1030,7 +1035,8 @@ ss::future<tx_errc> rm_stm::do_abort_tx(
_mem_state.last_end_tx = r.value().last_offset;
}

if (!co_await wait_no_throw(r.value().last_offset, timeout)) {
if (!co_await wait_no_throw(
r.value().last_offset, model::timeout_clock::now() + timeout)) {
vlog(
_ctx_log.trace,
"timeout on waiting until {} is applied (abort_tx pid:{} tx_seq:{})",
Expand Down Expand Up @@ -1447,7 +1453,8 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) {
co_return r.error();
}
if (!co_await wait_no_throw(
model::offset(r.value().last_offset()), _sync_timeout)) {
model::offset(r.value().last_offset()),
model::timeout_clock::now() + _sync_timeout)) {
vlog(
_ctx_log.warn,
"application of the replicated tx batch has timed out pid:{}",
Expand Down Expand Up @@ -2084,7 +2091,9 @@ ss::future<> rm_stm::do_try_abort_old_tx(model::producer_identity pid) {
// catching up with all previous end_tx operations (commit | abort)
// to avoid writing the same commit | abort marker twice
if (_mem_state.last_end_tx >= model::offset{0}) {
if (!co_await wait_no_throw(_mem_state.last_end_tx, _sync_timeout)) {
if (!co_await wait_no_throw(
_mem_state.last_end_tx,
model::timeout_clock::now() + _sync_timeout)) {
co_return;
}
}
Expand Down Expand Up @@ -2166,7 +2175,8 @@ ss::future<> rm_stm::do_try_abort_old_tx(model::producer_identity pid) {
_mem_state.last_end_tx = cr.value().last_offset;
}
if (!co_await wait_no_throw(
cr.value().last_offset, _sync_timeout)) {
cr.value().last_offset,
model::timeout_clock::now() + _sync_timeout)) {
vlog(
_ctx_log.warn,
"Timed out on waiting for the commit marker to be "
Expand Down Expand Up @@ -2207,7 +2217,8 @@ ss::future<> rm_stm::do_try_abort_old_tx(model::producer_identity pid) {
_mem_state.last_end_tx = cr.value().last_offset;
}
if (!co_await wait_no_throw(
cr.value().last_offset, _sync_timeout)) {
cr.value().last_offset,
model::timeout_clock::now() + _sync_timeout)) {
vlog(
_ctx_log.warn,
"Timed out on waiting for the abort marker to be applied "
Expand Down
14 changes: 10 additions & 4 deletions src/v/cluster/tests/rm_stm_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,11 @@ FIXTURE_TEST(test_tx_aborted_tx_1, mux_state_machine_fixture) {

auto op = stm.abort_tx(pid2, tx_seq, 2'000ms).get0();
BOOST_REQUIRE_EQUAL(op, cluster::tx_errc::none);
BOOST_REQUIRE(
stm.wait_no_throw(_raft.get()->committed_offset(), 2'000ms).get0());
BOOST_REQUIRE(stm
.wait_no_throw(
_raft.get()->committed_offset(),
model::timeout_clock::now() + 2'000ms)
.get0());
aborted_txs = stm.aborted_transactions(min_offset, max_offset).get0();

BOOST_REQUIRE_EQUAL(aborted_txs.size(), 1);
Expand Down Expand Up @@ -331,8 +334,11 @@ FIXTURE_TEST(test_tx_aborted_tx_2, mux_state_machine_fixture) {

op = stm.abort_tx(pid2, tx_seq, 2'000ms).get0();
BOOST_REQUIRE_EQUAL(op, cluster::tx_errc::none);
BOOST_REQUIRE(
stm.wait_no_throw(_raft.get()->committed_offset(), 2'000ms).get0());
BOOST_REQUIRE(stm
.wait_no_throw(
_raft.get()->committed_offset(),
model::timeout_clock::now() + 2'000ms)
.get0());
aborted_txs = stm.aborted_transactions(min_offset, max_offset).get0();

BOOST_REQUIRE_EQUAL(aborted_txs.size(), 1);
Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/tm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ tm_stm::do_update_tx(tm_transaction tx, model::term_id term) {
}

auto offset = model::offset(r.value().last_offset());
if (!co_await wait_no_throw(offset, _sync_timeout)) {
if (!co_await wait_no_throw(
offset, model::timeout_clock::now() + _sync_timeout)) {
vlog(
txlog.info,
"timeout on waiting until {} is applied on updating tx:{} pid:{} "
Expand Down Expand Up @@ -597,7 +598,8 @@ ss::future<tm_stm::op_status> tm_stm::do_register_new_producer(
}

if (!co_await wait_no_throw(
model::offset(r.value().last_offset()), _sync_timeout)) {
model::offset(r.value().last_offset()),
model::timeout_clock::now() + _sync_timeout)) {
co_return tm_stm::op_status::unknown;
}
if (_c->term() != expected_term) {
Expand Down
8 changes: 5 additions & 3 deletions src/v/raft/state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ ss::future<> state_machine::stop() {
}

ss::future<> state_machine::wait(
model::offset offset, model::timeout_clock::time_point timeout) {
return ss::with_gate(_gate, [this, timeout, offset] {
return _waiters.wait(offset, timeout, std::nullopt);
model::offset offset,
model::timeout_clock::time_point timeout,
std::optional<std::reference_wrapper<ss::abort_source>> as) {
return ss::with_gate(_gate, [this, timeout, offset, as] {
return _waiters.wait(offset, timeout, as);
});
}

Expand Down
6 changes: 5 additions & 1 deletion src/v/raft/state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ class state_machine {
virtual ss::future<> stop();

// wait until at least offset is applied to state machine
ss::future<> wait(model::offset, model::timeout_clock::time_point);
ss::future<> wait(
model::offset,
model::timeout_clock::time_point,
std::optional<std::reference_wrapper<ss::abort_source>> as
= std::nullopt);

/**
* This must be implemented by the state machine. The state machine should
Expand Down
Loading