Skip to content

Commit

Permalink
Merge pull request redpanda-data#11795 from VladLazar/retention-fixes…
Browse files Browse the repository at this point in the history
…-quick

cloud_storage: allow STM retention to proceed and more archive retention fixes
  • Loading branch information
piyushredpanda authored Jun 30, 2023
2 parents ca7b5ef + 18df60c commit b6e63b8
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 46 deletions.
3 changes: 2 additions & 1 deletion src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,7 @@ ss::future<> ntp_archiver::garbage_collect_archive() {

auto cursor = std::move(backlog.value());

using eof = cloud_storage::async_manifest_view_cursor::eof;
while (cursor->get_status()
== cloud_storage::async_manifest_view_cursor_status::
materialized_spillover) {
Expand Down Expand Up @@ -1984,7 +1985,7 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
"Failed to load next spillover manifest: {}",
res.error());
break;
} else if (res.value() == false) {
} else if (res.value() == eof::yes) {
// End of stream
break;
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/archival/tests/archival_metadata_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -711,9 +711,9 @@ FIXTURE_TEST(test_archival_stm_spillover, archival_metadata_stm_fixture) {
model::offset{0}, 0, ss::lowres_clock::now() + 10s, never_abort)
.get();
BOOST_REQUIRE_EQUAL(
archival_stm->manifest().get_archive_start_offset(), model::offset(0));
archival_stm->manifest().get_archive_start_offset(), model::offset{});
BOOST_REQUIRE_EQUAL(
archival_stm->manifest().get_archive_clean_offset(), model::offset(0));
archival_stm->manifest().get_archive_clean_offset(), model::offset{});

// unaligned spillover command shouldn't remove segment
archival_stm
Expand Down
12 changes: 9 additions & 3 deletions src/v/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -634,17 +634,23 @@ FIXTURE_TEST(test_archive_retention, archiver_fixture) {
auto fut = archiver.garbage_collect_archive();
tests::cooperative_spin_wait_with_timeout(5s, [this, part]() mutable {
const auto& manifest = part->archival_meta_stm()->manifest();
bool archive_clean_moved = manifest.get_archive_clean_offset()
== model::offset{2000};
bool archive_clean_offset_reset = manifest.get_archive_clean_offset()
== model::offset{};
bool archive_start_offset_reset = manifest.get_archive_start_offset()
== model::offset{};
bool deletes_sent = std::count_if(
get_targets().begin(),
get_targets().end(),
[](auto it) { return it.second.has_q_delete; })
== 2;
return archive_clean_moved && deletes_sent;
return archive_clean_offset_reset && archive_start_offset_reset
&& deletes_sent;
}).get();
fut.get();

BOOST_REQUIRE_EQUAL(
part->archival_meta_stm()->manifest().get_spillover_map().size(), 0);

ss::sstring delete_payloads;
for (const auto& [url, req] : get_targets()) {
if (req.has_q_delete) {
Expand Down
97 changes: 81 additions & 16 deletions src/v/cloud_storage/async_manifest_view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ bool async_manifest_view_cursor::manifest_in_range(
});
}

ss::future<result<bool, error_outcome>> async_manifest_view_cursor::next() {
ss::future<result<async_manifest_view_cursor::eof, error_outcome>>
async_manifest_view_cursor::next() {
static constexpr auto EOS = model::offset{};
auto next_base_offset = ss::visit(
_current,
Expand All @@ -222,27 +223,27 @@ ss::future<result<bool, error_outcome>> async_manifest_view_cursor::next() {
});

if (next_base_offset == EOS || next_base_offset > _end) {
co_return false;
co_return eof::yes;
}
auto manifest = co_await _view.get_materialized_manifest(next_base_offset);
if (manifest.has_failure()) {
co_return manifest.as_failure();
}
if (unlikely(!manifest_in_range(manifest.value()))) {
co_return false;
co_return error_outcome::out_of_range;
}
_current = manifest.value();
_timer.rearm(_idle_timeout + ss::lowres_clock::now());
co_return true;
co_return eof::no;
}

ss::future<ss::stop_iteration> async_manifest_view_cursor::next_iter() {
auto res = co_await next();
if (res.has_failure()) {
throw std::system_error(res.error());
}
co_return res.value() == true ? ss::stop_iteration::yes
: ss::stop_iteration::no;
co_return res.value() == eof::yes ? ss::stop_iteration::yes
: ss::stop_iteration::no;
}

std::optional<std::reference_wrapper<const partition_manifest>>
Expand Down Expand Up @@ -479,7 +480,9 @@ ss::future<> async_manifest_view::run_bg_loop() {
}

ss::future<result<std::unique_ptr<async_manifest_view_cursor>, error_outcome>>
async_manifest_view::get_cursor(async_view_search_query_t query) noexcept {
async_manifest_view::get_cursor(
async_view_search_query_t query,
std::optional<model::offset> end_inclusive) noexcept {
try {
ss::gate::holder h(_gate);
if (
Expand All @@ -494,12 +497,24 @@ async_manifest_view::get_cursor(async_view_search_query_t query) noexcept {
co_return error_outcome::out_of_range;
}
model::offset begin;
model::offset end = _stm_manifest.get_last_offset();
model::offset end = end_inclusive.value_or(
_stm_manifest.get_last_offset());
if (_stm_manifest.get_archive_start_offset() == model::offset{}) {
begin = _stm_manifest.get_start_offset().value_or(begin);
} else {
begin = _stm_manifest.get_archive_start_offset();
}

if (end < begin) {
vlog(
_ctxlog.debug,
"invalid end offset: stm_manifest_begin={} stm_manifest_end={} "
"end_inclusive_override={}",
begin,
_stm_manifest.get_last_offset(),
end_inclusive);
co_return error_outcome::out_of_range;
}
auto cursor = std::make_unique<async_manifest_view_cursor>(
*this, begin, end, _manifest_meta_ttl());
// This calls 'get_materialized_manifest' internally which
Expand Down Expand Up @@ -754,8 +769,22 @@ async_manifest_view::time_based_retention(
"Computing time-based retention, boundary: {}, now: {}",
now - delta,
now);

if (!_stm_manifest.get_start_offset().has_value()) {
vlog(
_ctxlog.error,
"Empty STM manifest with archive in place: "
"stm_start_offset={}, archive_start_offset={}, "
"archive_clean_offset={}",
_stm_manifest.get_start_offset(),
_stm_manifest.get_archive_start_offset(),
_stm_manifest.get_archive_clean_offset());
co_return error_outcome::failure;
}

auto res = co_await get_cursor(
_stm_manifest.get_archive_start_offset());
_stm_manifest.get_archive_start_offset(),
model::prev_offset(_stm_manifest.get_start_offset().value()));
if (res.has_failure() && res.error() != error_outcome::out_of_range) {
vlog(
_ctxlog.error,
Expand Down Expand Up @@ -795,12 +824,13 @@ async_manifest_view::time_based_retention(
if (!eof) {
auto r = co_await cursor->next();
if (
r.has_failure()
&& r.error() == error_outcome::out_of_range) {
r.has_value()
&& r.value() == async_manifest_view_cursor::eof::yes) {
vlog(
_ctxlog.info,
"Entire archive is removed by the time-based "
"retention");
break;
} else if (r.has_failure()) {
vlog(
_ctxlog.error,
Expand Down Expand Up @@ -853,8 +883,22 @@ async_manifest_view::size_based_retention(size_t size_limit) noexcept {
cloud_log_size,
size_limit,
to_remove);

if (!_stm_manifest.get_start_offset().has_value()) {
vlog(
_ctxlog.error,
"Empty STM manifest with archive in place: "
"stm_start_offset={}, archive_start_offset={}, "
"archive_clean_offset={}",
_stm_manifest.get_start_offset(),
_stm_manifest.get_archive_start_offset(),
_stm_manifest.get_archive_clean_offset());
co_return error_outcome::failure;
}

auto res = co_await get_cursor(
_stm_manifest.get_archive_start_offset());
_stm_manifest.get_archive_start_offset(),
model::prev_offset(_stm_manifest.get_start_offset().value()));
if (res.has_failure()) {
vlog(
_ctxlog.error,
Expand All @@ -876,14 +920,15 @@ async_manifest_view::size_based_retention(size_t size_limit) noexcept {
auto eof = cursor->with_manifest(
[this, &to_remove, &result](const auto& manifest) mutable {
for (const auto& meta : manifest) {
result.offset = meta.base_offset;
result.delta = meta.delta_offset;

if (meta.size_bytes > to_remove) {
vlog(_ctxlog.debug, "Retention stop at {}", meta);
to_remove = 0;
return true;
} else {
to_remove -= meta.size_bytes;
result.offset = meta.base_offset;
result.delta = meta.delta_offset;
vlog(
_ctxlog.debug,
"Retention consume {}, remaining bytes: {}",
Expand All @@ -901,12 +946,32 @@ async_manifest_view::size_based_retention(size_t size_limit) noexcept {
if (!eof) {
auto r = co_await cursor->next();
if (
r.has_failure()
&& r.error() == error_outcome::out_of_range) {
r.has_value()
&& r.value() == async_manifest_view_cursor::eof::yes) {
// If the retention policy requires us to remove
// segments from the STM manifest, or if the entire
// archive was removed, the archive start offset should
// be advanced to match that of the STM region.
if (!_stm_manifest.empty()) {
// The STM manifest should never be empty here since
// we have an archive in place.
result.offset = _stm_manifest.begin()->base_offset;
result.delta = _stm_manifest.begin()->delta_offset;
} else {
vlog(
_ctxlog.error,
"Empty STM manifest with archive in place: "
"stm_start_offset={}, archive_start_offset={}, "
"archive_clean_offset={}",
_stm_manifest.get_start_offset(),
_stm_manifest.get_archive_start_offset(),
_stm_manifest.get_archive_clean_offset());
}
vlog(
_ctxlog.info,
"Entire archive is removed by the size-based "
"retention");
break;
} else if (r.has_failure()) {
vlog(
_ctxlog.error,
Expand Down
7 changes: 5 additions & 2 deletions src/v/cloud_storage/async_manifest_view.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ class async_manifest_view {
/// available.
ss::future<
result<std::unique_ptr<async_manifest_view_cursor>, error_outcome>>
get_cursor(async_view_search_query_t q) noexcept;
get_cursor(
async_view_search_query_t q,
std::optional<model::offset> end_inclusive = std::nullopt) noexcept;

/// Get inactive spillover manifests which are waiting for
/// retention
Expand Down Expand Up @@ -254,7 +256,8 @@ class async_manifest_view_cursor {
async_manifest_view_cursor_status get_status() const;

/// Move to the next manifest or fail
ss::future<result<bool, error_outcome>> next();
using eof = ss::bool_class<struct eof_tag>;
ss::future<result<eof, error_outcome>> next();

/// Shortcut to use with Seastar's future utils.
///
Expand Down
45 changes: 42 additions & 3 deletions src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,15 @@ void partition_manifest::set_archive_clean_offset(
return;
}
if (_archive_clean_offset < start_rp_offset) {
_archive_clean_offset = start_rp_offset;
if (start_rp_offset == _start_offset) {
// If we've truncated up to the start offset of the STM manifest,
// the archive is completely removed.
_archive_clean_offset = model::offset{};
_archive_start_offset = model::offset{};
_archive_start_offset_delta = model::offset_delta{};
} else {
_archive_clean_offset = start_rp_offset;
}
if (_archive_size_bytes >= size_bytes) {
_archive_size_bytes -= size_bytes;
} else {
Expand All @@ -669,12 +677,43 @@ void partition_manifest::set_archive_clean_offset(
start_rp_offset,
_archive_clean_offset);
}

// Prefix truncate to get rid of the spillover manifests
// that have fallen below the clean offset.
const auto previous_spillover_manifests_size = _spillover_manifests.size();
if (_archive_clean_offset == model::offset{}) {
// Handle the case where the entire archive was removed.
_spillover_manifests = {};
} else {
std::optional<model::offset> truncation_point;
for (const auto& spill : _spillover_manifests) {
if (spill.base_offset >= _archive_clean_offset) {
break;
}
truncation_point = spill.base_offset;
}

if (truncation_point) {
vassert(
_archive_clean_offset >= *truncation_point,
"Attempt to prefix truncate the spillover manifest list above "
"the "
"archive clean offest: {} > {}",
*truncation_point,
_archive_clean_offset);
_spillover_manifests.prefix_truncate(*truncation_point);
}
}

vlog(
cst_log.info,
"{} archive clean offset moved to {} archive size set to {}",
"{} archive clean offset moved to {} archive size set to {}; count of "
"spillover manifests {} -> {}",
_ntp,
_archive_clean_offset,
_archive_size_bytes);
_archive_size_bytes,
previous_spillover_manifests_size,
_spillover_manifests.size());
}

bool partition_manifest::advance_start_kafka_offset(
Expand Down
Loading

0 comments on commit b6e63b8

Please sign in to comment.