From ff358ccfe4c4247881de9750c0475f60beeea1e9 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 7 May 2024 16:58:53 +0100 Subject: [PATCH] archival: clamp uploads to committed offset The archival/tiered storage correctness assumption builds on the (wrong) assumption that LSO is monotonic. Tiered storage doesn't have a concept of suffix truncation so if that would happen it would lead violations of correctness properties and diverging logs/undefined behavior. However, we have discovered that property does not hold if there are no in-progress transaction and acks=0/1 or write caching is in use because LSO falls back to "last visible index"[^1] which can get truncated. Ref https://github.com/redpanda-data/redpanda/issues/18244 [^1]: https://github.com/redpanda-data/redpanda/blob/88ac775f9f7954330732024abfa6e9ed5c9c11fd/src/v/cluster/rm_stm.cc#L1322 --- src/v/archival/ntp_archiver_service.cc | 29 +++++++++++++++++++------- src/v/archival/ntp_archiver_service.h | 16 +++++++++++--- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 8c9ff012b53c..39d8e3a6aba7 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -1618,7 +1618,7 @@ ntp_archiver::schedule_single_upload(const upload_context& upload_ctx) { } ss::future> -ntp_archiver::schedule_uploads(model::offset last_stable_offset) { +ntp_archiver::schedule_uploads(model::offset max_offset_exclusive) { // We have to increment last offset to guarantee progress. // The manifest's last offset contains dirty_offset of the // latest uploaded segment but '_policy' requires offset that @@ -1642,7 +1642,7 @@ ntp_archiver::schedule_uploads(model::offset last_stable_offset) { params.push_back({ .upload_kind = segment_upload_kind::non_compacted, .start_offset = start_upload_offset, - .last_offset = last_stable_offset, + .last_offset = max_offset_exclusive, .allow_reuploads = allow_reuploads_t::no, .archiver_term = _start_term, }); @@ -2026,15 +2026,30 @@ ss::future ntp_archiver::wait_all_scheduled_uploads( .compacted_upload_result = compacted_result}; } +model::offset ntp_archiver::max_uploadable_offset_exclusive() const { + // We impose an additional (LSO) constraint on the uploadable offset to + // as we need to have a complete index of aborted transactions if any + // before we can upload a segment. + return std::min( + _parent.last_stable_offset(), + model::next_offset(_parent.committed_offset())); +} + ss::future ntp_archiver::upload_next_candidates( - std::optional lso_override) { - vlog(_rtclog.debug, "Uploading next candidates called for {}", _ntp); - auto last_stable_offset = lso_override ? *lso_override - : _parent.last_stable_offset(); + std::optional max_offset_override_exclusive) { + auto max_offset_exclusive = max_offset_override_exclusive + ? *max_offset_override_exclusive + : max_uploadable_offset_exclusive(); + vlog( + _rtclog.debug, + "Uploading next candidates called for {} with max_offset_exclusive={}", + _ntp, + max_offset_exclusive); ss::gate::holder holder(_gate); try { auto units = co_await ss::get_units(_mutex, 1, _as); - auto scheduled_uploads = co_await schedule_uploads(last_stable_offset); + auto scheduled_uploads = co_await schedule_uploads( + max_offset_exclusive); co_return co_await wait_all_scheduled_uploads( std::move(scheduled_uploads)); } catch (const ss::gate_closed_exception&) { diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index 4d0e15c4fb9d..69cdd61b7f83 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -175,15 +175,25 @@ class ntp_archiver { auto operator<=>(const batch_result&) const = default; }; + /// Compute the maximum offset that is safe to be uploaded to the cloud. + /// + /// It must be guaranteed that this offset is monotonically increasing/ + /// can never go backwards. Otherwise, the local and cloud logs will + /// diverge leading to undefined behavior. + model::offset max_uploadable_offset_exclusive() const; + /// \brief Upload next set of segments to S3 (if any) /// The semaphore is used to track number of parallel uploads. The method /// will pick not more than '_concurrency' candidates and start /// uploading them. /// - /// \param lso_override last stable offset override + /// \param max_offset_override_exclusive Overrides the maximum offset + /// that can be uploaded. If nullopt, the maximum offset is + /// calculated automatically. /// \return future that returns number of uploaded/failed segments virtual ss::future upload_next_candidates( - std::optional last_stable_offset_override = std::nullopt); + std::optional max_offset_override_exclusive + = std::nullopt); ss::future sync_manifest(); @@ -422,7 +432,7 @@ class ntp_archiver { /// Start all uploads ss::future> - schedule_uploads(model::offset last_stable_offset); + schedule_uploads(model::offset max_offset_exclusive); ss::future> schedule_uploads(std::vector loop_contexts);