diff --git a/src/v/cluster/archival_metadata_stm.cc b/src/v/cluster/archival_metadata_stm.cc index c351bc268eb2..4372cdfeb06f 100644 --- a/src/v/cluster/archival_metadata_stm.cc +++ b/src/v/cluster/archival_metadata_stm.cc @@ -36,10 +36,12 @@ #include "vlog.h" #include +#include #include #include #include +#include namespace cluster { @@ -585,9 +587,7 @@ archival_metadata_stm::sync(model::timeout_clock::duration timeout) { // below. If replication failed, exit unsuccessfully. if (_last_replicate) { - auto fut = std::exchange(_last_replicate, std::nullopt).value(); - - if (!fut.available()) { + if (!_last_replicate->result.available()) { vlog( _logger.debug, "Waiting for ongoing replication before syncing"); } @@ -595,8 +595,8 @@ archival_metadata_stm::sync(model::timeout_clock::duration timeout) { try { const auto before = model::timeout_clock::now(); - const auto res = co_await ss::with_timeout( - before + timeout, std::move(fut)); + const auto res = co_await _last_replicate->result.get_future( + before + timeout); const auto after = model::timeout_clock::now(); // Update the timeout whille accounting for under/overflow. @@ -610,15 +610,30 @@ archival_metadata_stm::sync(model::timeout_clock::duration timeout) { timeout -= duration; + // If we've got this far it means that the _last_replicate future + // was resolved. If it resolved successfully, then we can continue. + // If it failed for any reason, it is safe to "forget" about it in + // only if the term changed. Otherwise we can't make any assumptions + // about its state. + // Stepping down (liveness) is guaranteed by the logic in the + // `do_replicate_commands` method. + if (res || _last_replicate->term < _raft->term()) { + _last_replicate = std::nullopt; + } + if (!res) { vlog( _logger.warn, "Replication failed for archival STM command: {}", res.error()); + co_return false; } } catch (const ss::timed_out_error&) { - vlog(_logger.error, "Replication wait for archival STM timed out"); + vlog( + _logger.warn, + "Replication wait for archival STM timed out (timeout = {})", + timeout); co_return false; } catch (...) { vlog( @@ -695,8 +710,9 @@ ss::future archival_metadata_stm::do_replicate_commands( const auto current_term = _insync_term; - ss::promise> replication_promise; - _last_replicate = replication_promise.get_future(); + ss::shared_promise> replication_promise; + _last_replicate = last_replicate{ + .term = current_term, .result = replication_promise.get_shared_future()}; auto fut = _raft diff --git a/src/v/cluster/archival_metadata_stm.h b/src/v/cluster/archival_metadata_stm.h index da2f704dde65..6e4f703d6c06 100644 --- a/src/v/cluster/archival_metadata_stm.h +++ b/src/v/cluster/archival_metadata_stm.h @@ -16,6 +16,7 @@ #include "cloud_storage/types.h" #include "cluster/persisted_stm.h" #include "features/fwd.h" +#include "model/fundamental.h" #include "model/metadata.h" #include "model/record.h" #include "storage/record_batch_builder.h" @@ -301,7 +302,11 @@ class archival_metadata_stm final : public persisted_stm<> { model::offset _last_dirty_at; // The last replication future - std::optional>> _last_replicate; + struct last_replicate { + model::term_id term; + ss::shared_future> result; + }; + std::optional _last_replicate; cloud_storage::remote& _cloud_storage_api; features::feature_table& _feature_table; diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 730a4b4b7f1c..00b57dc3c3d8 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -490,6 +490,7 @@ def _load_s3_context(self, logger, test_context): self.endpoint_url = None # None so boto auto-gens the endpoint url self.cloud_storage_disable_tls = False # SI will fail to create archivers if tls is disabled self.cloud_storage_region = cloud_storage_region + self.cloud_storage_api_endpoint_port = 443 elif cloud_storage_credentials_source == 'config_file' and cloud_storage_access_key and cloud_storage_secret_key: logger.info("Running on AWS S3, setting credentials") self.cloud_storage_access_key = cloud_storage_access_key