diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 4eba8e3b6a27..088433515f67 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -1931,6 +1931,7 @@ ss::future<> ntp_archiver::garbage_collect_archive() { auto cursor = std::move(backlog.value()); + using eof = cloud_storage::async_manifest_view_cursor::eof; while (cursor->get_status() == cloud_storage::async_manifest_view_cursor_status:: materialized_spillover) { @@ -1984,7 +1985,7 @@ ss::future<> ntp_archiver::garbage_collect_archive() { "Failed to load next spillover manifest: {}", res.error()); break; - } else if (res.value() == false) { + } else if (res.value() == eof::yes) { // End of stream break; } diff --git a/src/v/archival/tests/archival_metadata_stm_test.cc b/src/v/archival/tests/archival_metadata_stm_test.cc index 9145f633faf8..1dcc7700c224 100644 --- a/src/v/archival/tests/archival_metadata_stm_test.cc +++ b/src/v/archival/tests/archival_metadata_stm_test.cc @@ -711,9 +711,9 @@ FIXTURE_TEST(test_archival_stm_spillover, archival_metadata_stm_fixture) { model::offset{0}, 0, ss::lowres_clock::now() + 10s, never_abort) .get(); BOOST_REQUIRE_EQUAL( - archival_stm->manifest().get_archive_start_offset(), model::offset(0)); + archival_stm->manifest().get_archive_start_offset(), model::offset{}); BOOST_REQUIRE_EQUAL( - archival_stm->manifest().get_archive_clean_offset(), model::offset(0)); + archival_stm->manifest().get_archive_clean_offset(), model::offset{}); // unaligned spillover command shouldn't remove segment archival_stm diff --git a/src/v/archival/tests/ntp_archiver_test.cc b/src/v/archival/tests/ntp_archiver_test.cc index 5fd58043e450..3688d3b5c412 100644 --- a/src/v/archival/tests/ntp_archiver_test.cc +++ b/src/v/archival/tests/ntp_archiver_test.cc @@ -634,17 +634,23 @@ FIXTURE_TEST(test_archive_retention, archiver_fixture) { auto fut = archiver.garbage_collect_archive(); tests::cooperative_spin_wait_with_timeout(5s, [this, part]() mutable { const auto& manifest = part->archival_meta_stm()->manifest(); - bool archive_clean_moved = manifest.get_archive_clean_offset() - == model::offset{2000}; + bool archive_clean_offset_reset = manifest.get_archive_clean_offset() + == model::offset{}; + bool archive_start_offset_reset = manifest.get_archive_start_offset() + == model::offset{}; bool deletes_sent = std::count_if( get_targets().begin(), get_targets().end(), [](auto it) { return it.second.has_q_delete; }) == 2; - return archive_clean_moved && deletes_sent; + return archive_clean_offset_reset && archive_start_offset_reset + && deletes_sent; }).get(); fut.get(); + BOOST_REQUIRE_EQUAL( + part->archival_meta_stm()->manifest().get_spillover_map().size(), 0); + ss::sstring delete_payloads; for (const auto& [url, req] : get_targets()) { if (req.has_q_delete) { diff --git a/src/v/cloud_storage/async_manifest_view.cc b/src/v/cloud_storage/async_manifest_view.cc index 0f70b4f33cfe..fb855adb2b45 100644 --- a/src/v/cloud_storage/async_manifest_view.cc +++ b/src/v/cloud_storage/async_manifest_view.cc @@ -210,7 +210,8 @@ bool async_manifest_view_cursor::manifest_in_range( }); } -ss::future> async_manifest_view_cursor::next() { +ss::future> +async_manifest_view_cursor::next() { static constexpr auto EOS = model::offset{}; auto next_base_offset = ss::visit( _current, @@ -222,18 +223,18 @@ ss::future> async_manifest_view_cursor::next() { }); if (next_base_offset == EOS || next_base_offset > _end) { - co_return false; + co_return eof::yes; } auto manifest = co_await _view.get_materialized_manifest(next_base_offset); if (manifest.has_failure()) { co_return manifest.as_failure(); } if (unlikely(!manifest_in_range(manifest.value()))) { - co_return false; + co_return error_outcome::out_of_range; } _current = manifest.value(); _timer.rearm(_idle_timeout + ss::lowres_clock::now()); - co_return true; + co_return eof::no; } ss::future async_manifest_view_cursor::next_iter() { @@ -241,8 +242,8 @@ ss::future async_manifest_view_cursor::next_iter() { if (res.has_failure()) { throw std::system_error(res.error()); } - co_return res.value() == true ? ss::stop_iteration::yes - : ss::stop_iteration::no; + co_return res.value() == eof::yes ? ss::stop_iteration::yes + : ss::stop_iteration::no; } std::optional> @@ -479,7 +480,9 @@ ss::future<> async_manifest_view::run_bg_loop() { } ss::future, error_outcome>> -async_manifest_view::get_cursor(async_view_search_query_t query) noexcept { +async_manifest_view::get_cursor( + async_view_search_query_t query, + std::optional end_inclusive) noexcept { try { ss::gate::holder h(_gate); if ( @@ -494,12 +497,24 @@ async_manifest_view::get_cursor(async_view_search_query_t query) noexcept { co_return error_outcome::out_of_range; } model::offset begin; - model::offset end = _stm_manifest.get_last_offset(); + model::offset end = end_inclusive.value_or( + _stm_manifest.get_last_offset()); if (_stm_manifest.get_archive_start_offset() == model::offset{}) { begin = _stm_manifest.get_start_offset().value_or(begin); } else { begin = _stm_manifest.get_archive_start_offset(); } + + if (end < begin) { + vlog( + _ctxlog.debug, + "invalid end offset: stm_manifest_begin={} stm_manifest_end={} " + "end_inclusive_override={}", + begin, + _stm_manifest.get_last_offset(), + end_inclusive); + co_return error_outcome::out_of_range; + } auto cursor = std::make_unique( *this, begin, end, _manifest_meta_ttl()); // This calls 'get_materialized_manifest' internally which @@ -754,8 +769,22 @@ async_manifest_view::time_based_retention( "Computing time-based retention, boundary: {}, now: {}", now - delta, now); + + if (!_stm_manifest.get_start_offset().has_value()) { + vlog( + _ctxlog.error, + "Empty STM manifest with archive in place: " + "stm_start_offset={}, archive_start_offset={}, " + "archive_clean_offset={}", + _stm_manifest.get_start_offset(), + _stm_manifest.get_archive_start_offset(), + _stm_manifest.get_archive_clean_offset()); + co_return error_outcome::failure; + } + auto res = co_await get_cursor( - _stm_manifest.get_archive_start_offset()); + _stm_manifest.get_archive_start_offset(), + model::prev_offset(_stm_manifest.get_start_offset().value())); if (res.has_failure() && res.error() != error_outcome::out_of_range) { vlog( _ctxlog.error, @@ -795,12 +824,13 @@ async_manifest_view::time_based_retention( if (!eof) { auto r = co_await cursor->next(); if ( - r.has_failure() - && r.error() == error_outcome::out_of_range) { + r.has_value() + && r.value() == async_manifest_view_cursor::eof::yes) { vlog( _ctxlog.info, "Entire archive is removed by the time-based " "retention"); + break; } else if (r.has_failure()) { vlog( _ctxlog.error, @@ -853,8 +883,22 @@ async_manifest_view::size_based_retention(size_t size_limit) noexcept { cloud_log_size, size_limit, to_remove); + + if (!_stm_manifest.get_start_offset().has_value()) { + vlog( + _ctxlog.error, + "Empty STM manifest with archive in place: " + "stm_start_offset={}, archive_start_offset={}, " + "archive_clean_offset={}", + _stm_manifest.get_start_offset(), + _stm_manifest.get_archive_start_offset(), + _stm_manifest.get_archive_clean_offset()); + co_return error_outcome::failure; + } + auto res = co_await get_cursor( - _stm_manifest.get_archive_start_offset()); + _stm_manifest.get_archive_start_offset(), + model::prev_offset(_stm_manifest.get_start_offset().value())); if (res.has_failure()) { vlog( _ctxlog.error, @@ -876,14 +920,15 @@ async_manifest_view::size_based_retention(size_t size_limit) noexcept { auto eof = cursor->with_manifest( [this, &to_remove, &result](const auto& manifest) mutable { for (const auto& meta : manifest) { + result.offset = meta.base_offset; + result.delta = meta.delta_offset; + if (meta.size_bytes > to_remove) { vlog(_ctxlog.debug, "Retention stop at {}", meta); to_remove = 0; return true; } else { to_remove -= meta.size_bytes; - result.offset = meta.base_offset; - result.delta = meta.delta_offset; vlog( _ctxlog.debug, "Retention consume {}, remaining bytes: {}", @@ -901,12 +946,32 @@ async_manifest_view::size_based_retention(size_t size_limit) noexcept { if (!eof) { auto r = co_await cursor->next(); if ( - r.has_failure() - && r.error() == error_outcome::out_of_range) { + r.has_value() + && r.value() == async_manifest_view_cursor::eof::yes) { + // If the retention policy requires us to remove + // segments from the STM manifest, or if the entire + // archive was removed, the archive start offset should + // be advanced to match that of the STM region. + if (!_stm_manifest.empty()) { + // The STM manifest should never be empty here since + // we have an archive in place. + result.offset = _stm_manifest.begin()->base_offset; + result.delta = _stm_manifest.begin()->delta_offset; + } else { + vlog( + _ctxlog.error, + "Empty STM manifest with archive in place: " + "stm_start_offset={}, archive_start_offset={}, " + "archive_clean_offset={}", + _stm_manifest.get_start_offset(), + _stm_manifest.get_archive_start_offset(), + _stm_manifest.get_archive_clean_offset()); + } vlog( _ctxlog.info, "Entire archive is removed by the size-based " "retention"); + break; } else if (r.has_failure()) { vlog( _ctxlog.error, diff --git a/src/v/cloud_storage/async_manifest_view.h b/src/v/cloud_storage/async_manifest_view.h index 854ae92900e5..bff9290f0f0e 100644 --- a/src/v/cloud_storage/async_manifest_view.h +++ b/src/v/cloud_storage/async_manifest_view.h @@ -95,7 +95,9 @@ class async_manifest_view { /// available. ss::future< result, error_outcome>> - get_cursor(async_view_search_query_t q) noexcept; + get_cursor( + async_view_search_query_t q, + std::optional end_inclusive = std::nullopt) noexcept; /// Get inactive spillover manifests which are waiting for /// retention @@ -254,7 +256,8 @@ class async_manifest_view_cursor { async_manifest_view_cursor_status get_status() const; /// Move to the next manifest or fail - ss::future> next(); + using eof = ss::bool_class; + ss::future> next(); /// Shortcut to use with Seastar's future utils. /// diff --git a/src/v/cloud_storage/partition_manifest.cc b/src/v/cloud_storage/partition_manifest.cc index 1802ef76cfd1..75c035b7d8b3 100644 --- a/src/v/cloud_storage/partition_manifest.cc +++ b/src/v/cloud_storage/partition_manifest.cc @@ -644,7 +644,15 @@ void partition_manifest::set_archive_clean_offset( return; } if (_archive_clean_offset < start_rp_offset) { - _archive_clean_offset = start_rp_offset; + if (start_rp_offset == _start_offset) { + // If we've truncated up to the start offset of the STM manifest, + // the archive is completely removed. + _archive_clean_offset = model::offset{}; + _archive_start_offset = model::offset{}; + _archive_start_offset_delta = model::offset_delta{}; + } else { + _archive_clean_offset = start_rp_offset; + } if (_archive_size_bytes >= size_bytes) { _archive_size_bytes -= size_bytes; } else { @@ -669,12 +677,43 @@ void partition_manifest::set_archive_clean_offset( start_rp_offset, _archive_clean_offset); } + + // Prefix truncate to get rid of the spillover manifests + // that have fallen below the clean offset. + const auto previous_spillover_manifests_size = _spillover_manifests.size(); + if (_archive_clean_offset == model::offset{}) { + // Handle the case where the entire archive was removed. + _spillover_manifests = {}; + } else { + std::optional truncation_point; + for (const auto& spill : _spillover_manifests) { + if (spill.base_offset >= _archive_clean_offset) { + break; + } + truncation_point = spill.base_offset; + } + + if (truncation_point) { + vassert( + _archive_clean_offset >= *truncation_point, + "Attempt to prefix truncate the spillover manifest list above " + "the " + "archive clean offest: {} > {}", + *truncation_point, + _archive_clean_offset); + _spillover_manifests.prefix_truncate(*truncation_point); + } + } + vlog( cst_log.info, - "{} archive clean offset moved to {} archive size set to {}", + "{} archive clean offset moved to {} archive size set to {}; count of " + "spillover manifests {} -> {}", _ntp, _archive_clean_offset, - _archive_size_bytes); + _archive_size_bytes, + previous_spillover_manifests_size, + _spillover_manifests.size()); } bool partition_manifest::advance_start_kafka_offset( diff --git a/src/v/cloud_storage/tests/async_manifest_view_test.cc b/src/v/cloud_storage/tests/async_manifest_view_test.cc index 6c386d41fbac..d9f104174de7 100644 --- a/src/v/cloud_storage/tests/async_manifest_view_test.cc +++ b/src/v/cloud_storage/tests/async_manifest_view_test.cc @@ -35,6 +35,7 @@ #include using namespace cloud_storage; +using eof = async_manifest_view_cursor::eof; static ss::logger test_log("async_manifest_view_log"); static const model::initial_revision_id manifest_rev(111); @@ -279,7 +280,7 @@ FIXTURE_TEST(test_async_manifest_view_iter, async_manifest_view_fixture) { actual.push_back(meta); } }); - } while (cursor->next().get().value()); + } while (cursor->next().get().value() != eof::yes); print_diff(actual, expected); BOOST_REQUIRE_EQUAL(expected.size(), actual.size()); BOOST_REQUIRE(expected == actual); @@ -329,7 +330,7 @@ FIXTURE_TEST(test_async_manifest_view_truncate, async_manifest_view_fixture) { actual.push_back(meta); } }); - } while (cursor->next().get().value()); + } while (cursor->next().get().value() != eof::yes); print_diff(actual, expected); BOOST_REQUIRE_EQUAL(expected.size(), actual.size()); BOOST_REQUIRE(expected == actual); @@ -351,7 +352,7 @@ FIXTURE_TEST(test_async_manifest_view_truncate, async_manifest_view_fixture) { actual.push_back(meta); } }); - } while (cursor->next().get().value()); + } while (cursor->next().get().value() != eof::yes); print_diff(actual, removed); BOOST_REQUIRE_EQUAL(removed.size(), actual.size()); BOOST_REQUIRE(removed == actual); @@ -378,7 +379,7 @@ FIXTURE_TEST(test_async_manifest_view_truncate, async_manifest_view_fixture) { actual.push_back(meta); } }); - } while (cursor->next().get().value()); + } while (cursor->next().get().value() != eof::yes); print_diff(actual, removed); BOOST_REQUIRE_EQUAL(removed.size(), actual.size()); BOOST_REQUIRE(removed == actual); @@ -449,7 +450,7 @@ FIXTURE_TEST( actual.push_back(meta); } }); - } while (cursor->next().get().value()); + } while (cursor->next().get().value() != eof::yes); print_diff(actual, expected); BOOST_REQUIRE_EQUAL(expected.size(), actual.size()); BOOST_REQUIRE(expected == actual); @@ -479,7 +480,7 @@ FIXTURE_TEST( actual.push_back(meta); } }); - } while (cursor->next().get().value()); + } while (cursor->next().get().value() != eof::yes); print_diff(actual, removed); BOOST_REQUIRE_EQUAL(removed.size(), actual.size()); BOOST_REQUIRE(removed == actual); @@ -563,14 +564,16 @@ FIXTURE_TEST(test_async_manifest_view_retention, async_manifest_view_fixture) { model::offset prefix_base_offset; model::offset_delta prefix_delta; for (const auto& meta : expected) { - prefix_size += meta.size_bytes; prefix_timestamp = meta.base_timestamp; prefix_base_offset = meta.base_offset; prefix_delta = meta.delta_offset; - quota--; + if (quota == 0) { break; } + + prefix_size += meta.size_bytes; + quota--; } vlog( diff --git a/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc b/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc index 998b8bf5fd46..37d5c7627971 100644 --- a/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc +++ b/src/v/cloud_storage/tests/remote_partition_fuzz_test.cc @@ -194,6 +194,11 @@ scan_remote_partition_incrementally_with_reuploads( /// This test scans the entire range of offsets FIXTURE_TEST( test_remote_partition_scan_translate_full_random, cloud_storage_fixture) { + vlog( + test_log.info, + "Seed used for read workload: {}", + random_generators::internal::seed); + constexpr int num_segments = 1000; const auto [batch_types, num_data_batches] = generate_segment_layout( num_segments, 42); @@ -217,6 +222,11 @@ FIXTURE_TEST( FIXTURE_TEST( test_remote_partition_scan_incrementally_random, cloud_storage_fixture) { + vlog( + test_log.info, + "Seed used for read workload: {}", + random_generators::internal::seed); + constexpr int num_segments = 1000; const auto [batch_types, num_data_batches] = generate_segment_layout( num_segments, 42); @@ -242,6 +252,11 @@ FIXTURE_TEST( FIXTURE_TEST( test_remote_partition_scan_incrementally_random_with_overlaps, cloud_storage_fixture) { + vlog( + test_log.info, + "Seed used for read workload: {}", + random_generators::internal::seed); + constexpr int num_segments = 1000; const auto [batch_types, num_data_batches] = generate_segment_layout( num_segments, 42); @@ -272,6 +287,11 @@ FIXTURE_TEST( FIXTURE_TEST( test_remote_partition_scan_incrementally_random_with_tx_fence, cloud_storage_fixture) { + vlog( + test_log.info, + "Seed used for read workload: {}", + random_generators::internal::seed); + constexpr int num_segments = 1000; const auto [segment_layout, num_data_batches] = generate_segment_layout( num_segments, 42, false); @@ -306,6 +326,11 @@ FIXTURE_TEST( FIXTURE_TEST( test_remote_partition_scan_incrementally_random_with_reuploads, cloud_storage_fixture) { + vlog( + test_log.info, + "Seed used for read workload: {}", + random_generators::internal::seed); + constexpr int num_segments = 1000; const auto [batch_types, num_data_batches] = generate_segment_layout( num_segments, 42); diff --git a/src/v/random/generators.h b/src/v/random/generators.h index 158f6ece0e71..e99b1ab07893 100644 --- a/src/v/random/generators.h +++ b/src/v/random/generators.h @@ -28,8 +28,11 @@ inline std::random_device::result_type get_seed() { auto seed = rd(); return seed; } + +static const auto seed = get_seed(); + // NOLINTNEXTLINE -static thread_local std::default_random_engine gen(internal::get_seed()); +static thread_local std::default_random_engine gen(internal::seed); } // namespace internal bytes get_bytes(size_t n = 128 * 1024); diff --git a/tests/rptest/utils/si_utils.py b/tests/rptest/utils/si_utils.py index 64b048e92522..fae64342d10e 100644 --- a/tests/rptest/utils/si_utils.py +++ b/tests/rptest/utils/si_utils.py @@ -32,6 +32,8 @@ default_log_segment_size = 1048576 # 1MB +DEFAULT_OFFSET = -9223372036854775808 + class NT(NamedTuple): ns: str @@ -1291,13 +1293,16 @@ def assert_segments_deleted(self, topic: str, partition: int): def segment_summaries(self, ntp: NTP): self._ensure_listing() - return self._state.segment_summaries[ntp] + if ntp in self._state.segment_summaries: + return self._state.segment_summaries[ntp] + else: + return dict() def is_archive_cleanup_complete(self, ntp: NTP): self._ensure_listing() manifest = self.manifest_for_ntp(ntp.topic, ntp.partition) - aso = manifest.get('archive_start_offset') - aco = manifest.get('archive_clean_offset') + aso = manifest.get('archive_start_offset', DEFAULT_OFFSET) + aco = manifest.get('archive_clean_offset', DEFAULT_OFFSET) summaries = self.segment_summaries(ntp) num = len(summaries) if aso > 0 and aco < aso: @@ -1309,7 +1314,7 @@ def is_archive_cleanup_complete(self, ntp: NTP): if len(summaries) == 0: self.logger.debug( f"archive is empty, start: {aso}, clean: {aco}, len: {num}") - return False + return True first_segment = min(summaries, key=lambda seg: seg.base_offset) @@ -1324,10 +1329,17 @@ def is_archive_cleanup_complete(self, ntp: NTP): def check_archive_integrity(self, ntp: NTP): self._ensure_listing() manifest = self.manifest_for_ntp(ntp.topic, ntp.partition) - next_base_offset = manifest.get('archive_start_offset') - expected_last = manifest.get('last_offset') - for summary in self.segment_summaries(ntp): - assert next_base_offset == summary.base_offset, f"Unexpected segment {summary}, expected base offset {next_base_offset}" - next_base_offset = summary.last_offset + 1 + summaries = self.segment_summaries(ntp) + if len(summaries) == 0: + assert 'archive_start_offset' not in manifest + assert 'archive_start_offset' not in manifest else: - assert expected_last == summary.last_offset, f"Unexpected last offset {summary.last_offset}, expected: {expected_last}" + next_base_offset = manifest.get('archive_start_offset') + stm_start_offset = manifest.get('start_offset') + expected_last = manifest.get('last_offset') + + for summary in summaries: + assert next_base_offset == summary.base_offset, f"Unexpected segment {summary}, expected base offset {next_base_offset}" + next_base_offset = summary.last_offset + 1 + else: + assert expected_last == summary.last_offset, f"Unexpected last offset {summary.last_offset}, expected: {expected_last}"