Skip to content

Commit

Permalink
Merge pull request #12316 from Lazin/pr/spillover-race-prevention
Browse files Browse the repository at this point in the history
cloud_storage: Spillover race prevention
  • Loading branch information
Lazin authored Jul 26, 2023
2 parents 21c8ea9 + e8d36cf commit 02c841c
Show file tree
Hide file tree
Showing 8 changed files with 729 additions and 226 deletions.
4 changes: 2 additions & 2 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2142,7 +2142,7 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
while (cursor->get_status()
== cloud_storage::async_manifest_view_cursor_status::
materialized_spillover) {
auto stop = cursor->with_manifest(
auto stop = co_await cursor->with_manifest(
[&](const cloud_storage::partition_manifest& manifest) {
for (const auto& meta : manifest) {
if (meta.committed_offset < clean_offset) {
Expand Down Expand Up @@ -2184,7 +2184,7 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
if (stop) {
break;
}
auto path = cursor->manifest()->get().get_manifest_path();
auto path = cursor->manifest()->get_manifest_path();
manifests_to_remove.push_back(path());
auto res = co_await cursor->next();
if (res.has_failure()) {
Expand Down
92 changes: 68 additions & 24 deletions src/v/cloud_storage/async_manifest_view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,31 @@ async_manifest_view_cursor::get_status() const {
});
}

ss::future<> async_manifest_view_cursor::maybe_sync_manifest() {
if (manifest_needs_sync()) {
auto res = co_await seek(_stm_start_offset.value());
if (res.has_failure()) {
throw std::system_error(res.error());
}
if (!res.value()) {
vlog(_view._ctxlog.error, "Can't sync manifest");
_current = stale_manifest();
}
}
}

bool async_manifest_view_cursor::manifest_needs_sync() const {
if (std::holds_alternative<stm_manifest_t>(_current)) {
// Invariant: if _current points to the STM manifest the
// _stm_start_offset is set
vassert(_stm_start_offset.has_value(), "STM start offset is not set");
const auto& m = std::get<stm_manifest_t>(_current).get();
auto so = m.get_start_offset().value_or(model::offset{});
return so != _stm_start_offset.value();
}
return false;
}

ss::future<result<bool, error_outcome>>
async_manifest_view_cursor::seek(async_view_search_query_t q) {
if (std::holds_alternative<model::offset>(q)) {
Expand Down Expand Up @@ -151,15 +176,15 @@ async_manifest_view_cursor::seek(async_view_search_query_t q) {
vlog(
_view._ctxlog.debug,
"Seeking STM manifest [{}-{}]",
p.get().get_start_offset().value(),
p.get().get_start_offset(),
p.get().get_last_offset());
return contains(p, q);
},
[this, q](const ss::shared_ptr<materialized_manifest>& m) {
vlog(
_view._ctxlog.debug,
"Seeking spillover manifest [{}-{}]",
m->manifest.get_start_offset().value(),
m->manifest.get_start_offset(),
m->manifest.get_last_offset());
return contains(m->manifest, q);
});
Expand Down Expand Up @@ -188,6 +213,17 @@ async_manifest_view_cursor::seek(async_view_search_query_t q) {
co_return false;
}
_current = res.value();
if (std::holds_alternative<stm_manifest_t>(_current)) {
// Invariant: if cursor points to the STM manifest _stm_start_offset is
// set to expected base offset
auto so = std::get<stm_manifest_t>(_current)
.get()
.get_start_offset()
.value_or(model::offset{});
_stm_start_offset = std::clamp(so, _begin, _end);
} else {
_stm_start_offset = std::nullopt;
}
_timer.rearm(_idle_timeout + ss::lowres_clock::now());
co_return true;
}
Expand All @@ -201,11 +237,25 @@ bool async_manifest_view_cursor::manifest_in_range(
[this](std::reference_wrapper<const partition_manifest> p) {
auto so = p.get().get_start_offset().value_or(model::offset{});
auto lo = p.get().get_last_offset();
vlog(
_view._ctxlog.debug,
"Spill manifest range: [{}/{}], cursor range: [{}/{}]",
so,
lo,
_begin,
_end);
return !(_end < so || _begin > lo);
},
[this](const ss::shared_ptr<materialized_manifest>& m) {
auto so = m->manifest.get_start_offset().value_or(model::offset{});
auto lo = m->manifest.get_last_offset();
vlog(
_view._ctxlog.debug,
"STM manifest range: [{}/{}], cursor range: [{}/{}]",
so,
lo,
_begin,
_end);
return !(_end < so || _begin > lo);
});
}
Expand All @@ -222,16 +272,6 @@ async_manifest_view_cursor::next() {
return model::next_offset(m->manifest.get_last_offset());
});

// TODO: address the general race:
// - spillover manifests are [0, 10], [11, 20], [21, 30]
// - STM manifest has segments [31, 40], [41, 50], [51, 60]
// - cursor is iterated over through the spillover manifests
// - next() called, moving cursor to offset 31, and STM manifest is
// selected and "materialized"
// - STM spills over to create new manifest [31, 50], new STM manifest only
// has [51, 60]
// - users of the cursor see manifest starting at offset 51 instead of 31

if (next_base_offset == EOS || next_base_offset > _end) {
co_return eof::yes;
}
Expand All @@ -243,6 +283,11 @@ async_manifest_view_cursor::next() {
co_return error_outcome::out_of_range;
}
_current = manifest.value();
if (std::holds_alternative<stm_manifest_t>(_current)) {
// Invariant: if cursor points to the STM manifest _stm_start_offset is
// set to expected base offset
_stm_start_offset = std::clamp(next_base_offset, _begin, _end);
}
_timer.rearm(_idle_timeout + ss::lowres_clock::now());
co_return eof::no;
}
Expand All @@ -256,13 +301,12 @@ ss::future<ss::stop_iteration> async_manifest_view_cursor::next_iter() {
: ss::stop_iteration::no;
}

std::optional<std::reference_wrapper<const partition_manifest>>
ssx::task_local_ptr<const partition_manifest>
async_manifest_view_cursor::manifest() const {
using ret_t
= std::optional<std::reference_wrapper<const partition_manifest>>;
using ret_t = ssx::task_local_ptr<const partition_manifest>;
return ss::visit(
_current,
[](std::monostate) -> ret_t { return std::nullopt; },
[](std::monostate) -> ret_t { return {}; },
[this](stale_manifest) -> ret_t {
auto errc = make_error_code(error_outcome::timed_out);
throw std::system_error(
Expand All @@ -273,10 +317,10 @@ async_manifest_view_cursor::manifest() const {
_view.get_ntp()));
},
[](std::reference_wrapper<const partition_manifest> m) -> ret_t {
return m;
return ret_t(&m.get());
},
[](const ss::shared_ptr<materialized_manifest>& m) -> ret_t {
return std::ref(m->manifest);
return ret_t(&m->manifest);
});
}

Expand Down Expand Up @@ -682,7 +726,7 @@ async_manifest_view::get_term_last_offset(model::term_id term) noexcept {
std::optional<kafka::offset> res_offset;
co_await ss::repeat(
[this, &res_offset, term, cursor = std::move(res.value())] {
const auto& manifest = cursor->manifest()->get();
const auto& manifest = *cursor->manifest();
vlog(
_ctxlog.debug,
"Scanning manifest {} for term {}",
Expand Down Expand Up @@ -841,7 +885,7 @@ async_manifest_view::offset_based_retention() noexcept {
_ctxlog.debug,
"There is no segment old enough to be removed by retention");
} else {
const auto& manifest = res.value()->manifest()->get();
const auto& manifest = *res.value()->manifest();
vassert(
!manifest.empty(),
"{} Spillover manifest can't be empty",
Expand Down Expand Up @@ -910,7 +954,7 @@ async_manifest_view::time_based_retention(
while (
cursor->get_status()
== async_manifest_view_cursor_status::materialized_spillover) {
auto eof = cursor->with_manifest(
auto eof = co_await cursor->with_manifest(
[boundary, &result](const partition_manifest& manifest) {
for (const auto& meta : manifest) {
if (meta.max_timestamp > boundary) {
Expand Down Expand Up @@ -961,7 +1005,7 @@ async_manifest_view::time_based_retention(
"Failed to find the retention boundary, the manifest {} "
"doesn't "
"have any matching segment",
cursor->manifest()->get().get_manifest_path());
cursor->manifest()->get_manifest_path());
}
}
} catch (...) {
Expand Down Expand Up @@ -1025,9 +1069,9 @@ async_manifest_view::size_based_retention(size_t size_limit) noexcept {
// go to the STM manifest and should only include archive.
// The end condition is the lambda returned true, otherwise
// we should keep scanning.
auto eof = cursor->with_manifest(
auto eof = co_await cursor->with_manifest(
[this, &to_remove, &result, clean_offset](
const auto& manifest) mutable {
const partition_manifest& manifest) mutable {
for (const auto& meta : manifest) {
// Skip segments below the clean offset as they're
// already eligible for GC. The reason why we are
Expand Down
124 changes: 118 additions & 6 deletions src/v/cloud_storage/async_manifest_view.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "cloud_storage_clients/types.h"
#include "model/metadata.h"
#include "model/timestamp.h"
#include "ssx/task_local_ptr.h"
#include "utils/retry_chain_node.h"

#include <seastar/core/abort_source.hh>
Expand All @@ -30,6 +31,7 @@
#include <chrono>
#include <exception>
#include <map>
#include <type_traits>
#include <variant>

namespace cloud_storage {
Expand Down Expand Up @@ -277,9 +279,13 @@ class async_manifest_view_cursor {

/// Return current manifest
///
/// \note the manifest is only valid during current time-slice and
/// could be invalidated after any scheduling point. The pointer
/// will be invalidated to prevent this from happening unnoticed.
/// The caller shouldn't dereference and cache the raw pointer to
/// the manifest anywhere.
/// \return pointer to the current manifest or nullopt
std::optional<std::reference_wrapper<const partition_manifest>>
manifest() const;
ssx::task_local_ptr<const partition_manifest> manifest() const;

/// Pass current manifest to the functor
///
Expand All @@ -288,13 +294,45 @@ class async_manifest_view_cursor {
/// any scheduling point. The reference will be invalidated in this
/// case.
template<class Fn>
auto with_manifest(Fn fn) {
auto ref = manifest();
vassert(ref.has_value(), "Invalid cursor, {}", _view.get_ntp());
return fn(ref->get());
auto with_manifest(Fn fn)
-> ss::future<decltype(fn(std::declval<const partition_manifest>()))> {
int quota = 4;
while (quota-- > 0) {
if (manifest_needs_sync()) {
// If the concurrent spillover event happens interim we will
// retry the operation after re-fetching the manifest.
co_await maybe_sync_manifest();
continue;
}
auto ref = manifest();
if (!ref.has_value()) {
throw std::runtime_error(fmt_with_ctx(
fmt::format, "Invalid cursor, {}", _view.get_ntp()));
}
co_return fn(*ref);
}
// This shouldn't happen normally because spillover events are supposed
// to be rare. This might happen though when the spillover was disabled
// and then enabled for the first time. In this case ntp_archiver will
// spill several manifests in a row and the reader might get stuck.
throw std::runtime_error(fmt_with_ctx(
fmt::format,
"Failed to fetch manifest because concurrent spills, {}",
_view.get_ntp()));
}

/// Make sure that the cursor points to the right manifest
///
/// If the cursor points at the STM manifest it may get out
/// of range after a scheduling point.
ss::future<> maybe_sync_manifest();

/// Returns 'false' if the manifest was changed concurrently
/// and the user could see a gap in segments.
bool manifest_needs_sync() const;

private:
using stm_manifest_t = std::reference_wrapper<const partition_manifest>;
void on_timeout();

bool manifest_in_range(const manifest_section_t& m);
Expand All @@ -308,6 +346,80 @@ class async_manifest_view_cursor {
ss::timer<ss::lowres_clock> _timer;
const model::offset _begin;
const model::offset _end;
std::optional<model::offset> _stm_start_offset{std::nullopt};
};

/// Invoke functor with every segment_meta accessible by the cursor
/// The functor is expected to return stop_iteration::yes if the processing
/// is completed.
template<class Fn>
ss::future<>
for_each_segment(std::unique_ptr<async_manifest_view_cursor> cursor, Fn func) {
while (true) {
int quota = 4;
while (quota-- > 0) {
if (cursor->manifest_needs_sync()) {
// If the concurrent spillover event happens interim we will
// retry the operation after re-fetching the manifest.
co_await cursor->maybe_sync_manifest();
continue;
}
break;
}
if (cursor->manifest_needs_sync()) {
throw std::runtime_error(fmt_with_ctx(
fmt::format,
"Failed to iterate to the next manifest due to contention"));
}
// Invoke functor
for (const auto& meta : *cursor->manifest()) {
auto stop = func(meta);
if (stop == ss::stop_iteration::yes) {
break;
}
}
if (co_await cursor->next_iter() == ss::stop_iteration::yes) {
break;
}
}
}

/// Invoke functor with every manifest accessible by the cursor
/// The functor is expected to return stop_iteration::yes if the processing
/// is completed.
/// \param cursor is an initialized cursor, the cursor should be pointing
/// somewhere
/// \param func is a functor that accepts
/// ssx::task_local_ptr<const partition_manifest>
template<class Fn>
ss::future<>
for_each_manifest(std::unique_ptr<async_manifest_view_cursor> cursor, Fn func) {
while (true) {
int quota = 4;
while (quota-- > 0) {
if (cursor->manifest_needs_sync()) {
// If the concurrent spillover event happens interim we will
// retry the operation after re-fetching the manifest.
co_await cursor->maybe_sync_manifest();
continue;
}
break;
}
if (cursor->manifest_needs_sync()) {
throw std::runtime_error(fmt_with_ctx(
fmt::format,
"Failed to iterate to the next manifest due to contention"));
}

// Invoke functor
auto stop = func(cursor->manifest());
if (stop == ss::stop_iteration::yes) {
break;
}
if (co_await cursor->next_iter() == ss::stop_iteration::yes) {
break;
}
}
}

} // namespace cloud_storage
Loading

0 comments on commit 02c841c

Please sign in to comment.