Skip to content

Commit

Permalink
Merge pull request #15816 from nvartolomei/nv/backport-15677-v23.2.x-768
Browse files Browse the repository at this point in the history
[v23.2.x] c/archival_stm: do not reset _last_replicate on timeout
  • Loading branch information
nvartolomei authored Jan 3, 2024
2 parents dd0079e + 2405b16 commit e909bf6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
32 changes: 24 additions & 8 deletions src/v/cluster/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
#include "vlog.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
#include <seastar/util/defer.hh>

#include <algorithm>
#include <optional>

namespace cluster {

Expand Down Expand Up @@ -585,18 +587,16 @@ archival_metadata_stm::sync(model::timeout_clock::duration timeout) {
// below. If replication failed, exit unsuccessfully.

if (_last_replicate) {
auto fut = std::exchange(_last_replicate, std::nullopt).value();

if (!fut.available()) {
if (!_last_replicate->result.available()) {
vlog(
_logger.debug, "Waiting for ongoing replication before syncing");
}

try {
const auto before = model::timeout_clock::now();

const auto res = co_await ss::with_timeout(
before + timeout, std::move(fut));
const auto res = co_await _last_replicate->result.get_future(
before + timeout);

const auto after = model::timeout_clock::now();
// Update the timeout whille accounting for under/overflow.
Expand All @@ -610,15 +610,30 @@ archival_metadata_stm::sync(model::timeout_clock::duration timeout) {

timeout -= duration;

// If we've got this far it means that the _last_replicate future
// was resolved. If it resolved successfully, then we can continue.
// If it failed for any reason, it is safe to "forget" about it in
// only if the term changed. Otherwise we can't make any assumptions
// about its state.
// Stepping down (liveness) is guaranteed by the logic in the
// `do_replicate_commands` method.
if (res || _last_replicate->term < _raft->term()) {
_last_replicate = std::nullopt;
}

if (!res) {
vlog(
_logger.warn,
"Replication failed for archival STM command: {}",
res.error());

co_return false;
}
} catch (const ss::timed_out_error&) {
vlog(_logger.error, "Replication wait for archival STM timed out");
vlog(
_logger.warn,
"Replication wait for archival STM timed out (timeout = {})",
timeout);
co_return false;
} catch (...) {
vlog(
Expand Down Expand Up @@ -695,8 +710,9 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(

const auto current_term = _insync_term;

ss::promise<result<raft::replicate_result>> replication_promise;
_last_replicate = replication_promise.get_future();
ss::shared_promise<result<raft::replicate_result>> replication_promise;
_last_replicate = last_replicate{
.term = current_term, .result = replication_promise.get_shared_future()};

auto fut
= _raft
Expand Down
7 changes: 6 additions & 1 deletion src/v/cluster/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "cloud_storage/types.h"
#include "cluster/persisted_stm.h"
#include "features/fwd.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/record.h"
#include "storage/record_batch_builder.h"
Expand Down Expand Up @@ -301,7 +302,11 @@ class archival_metadata_stm final : public persisted_stm<> {
model::offset _last_dirty_at;

// The last replication future
std::optional<ss::future<result<raft::replicate_result>>> _last_replicate;
struct last_replicate {
model::term_id term;
ss::shared_future<result<raft::replicate_result>> result;
};
std::optional<last_replicate> _last_replicate;

cloud_storage::remote& _cloud_storage_api;
features::feature_table& _feature_table;
Expand Down
1 change: 1 addition & 0 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ def _load_s3_context(self, logger, test_context):
self.endpoint_url = None # None so boto auto-gens the endpoint url
self.cloud_storage_disable_tls = False # SI will fail to create archivers if tls is disabled
self.cloud_storage_region = cloud_storage_region
self.cloud_storage_api_endpoint_port = 443
elif cloud_storage_credentials_source == 'config_file' and cloud_storage_access_key and cloud_storage_secret_key:
logger.info("Running on AWS S3, setting credentials")
self.cloud_storage_access_key = cloud_storage_access_key
Expand Down

0 comments on commit e909bf6

Please sign in to comment.