diff --git a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc index a76ac465a7f2..5eb37c436303 100644 --- a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc +++ b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc @@ -43,6 +43,70 @@ using tests::kv_t; static ss::logger e2e_test_log("e2e_test"); +class ManualFixture + : public s3_imposter_fixture + , public manual_metadata_upload_mixin + , public redpanda_thread_fixture + , public enable_cloud_storage_fixture + , public ::testing::Test { +public: + ManualFixture() + : redpanda_thread_fixture( + redpanda_thread_fixture::init_cloud_storage_tag{}, + httpd_port_number()) { + // No expectations: tests will PUT and GET organically. + set_expectations_and_listen({}); + wait_for_controller_leadership().get(); + } + + scoped_config test_local_cfg; +}; + +TEST_F(ManualFixture, TestSpilloverRetentionCompactedTopic) { + test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests") + .set_value(true); + test_local_cfg.get("cloud_storage_spillover_manifest_max_segments") + .set_value(std::make_optional(5)); + test_local_cfg.get("cloud_storage_spillover_manifest_size") + .set_value(std::optional{}); + test_local_cfg.get("log_retention_ms") + .set_value(std::make_optional(1ms)); + const model::topic topic_name("tapioca"); + model::ntp ntp(model::kafka_namespace, topic_name, 0); + + cluster::topic_properties props; + props.shadow_indexing = model::shadow_indexing_mode::full; + props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction; + add_topic({model::kafka_namespace, topic_name}, 1, props).get(); + wait_for_leader(ntp).get(); + + const auto records_per_seg = 5; + const auto num_segs = 100; + auto partition = app.partition_manager.local().get(ntp); + auto& archiver = partition->archiver().value().get(); + tests::remote_segment_generator gen(make_kafka_client().get(), *partition); + auto total_records = gen.num_segments(num_segs) + .batches_per_segment(records_per_seg) + .produce() + .get(); + ASSERT_GE(total_records, 500); + ASSERT_TRUE(archiver.sync_for_tests().get()); + archiver.apply_spillover().get(); + ss::sleep(5s).get(); + archiver.apply_archive_retention().get(); + + tests::kafka_list_offsets_transport lister(make_kafka_client().get()); + lister.start().get(); + + auto offset + = lister.start_offset_for_partition(topic_name, model::partition_id(0)) + .get(); + ASSERT_EQ(offset(), 0); + ASSERT_EQ( + archiver.manifest().full_log_start_offset().value_or(model::offset{})(), + 0); +} + class EndToEndFixture : public s3_imposter_fixture , public manual_metadata_upload_mixin diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index 0f6d47b4bd44..475064075650 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -2291,6 +2291,10 @@ ss::future<> ntp_archiver::apply_archive_retention() { } const auto& ntp_conf = _parent.get_ntp_config(); + if (!ntp_conf.is_collectable()) { + vlog(_rtclog.trace, "NTP is not collectable"); + co_return; + } std::optional retention_bytes = ntp_conf.retention_bytes(); std::optional retention_ms = ntp_conf.retention_duration();