diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index 1213b5cf2f18..78a487b84a44 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -111,6 +111,7 @@ v_cc_library( tm_stm_cache.cc tm_stm.cc rm_stm.cc + log_eviction_stm.cc tx_helpers.cc security_manager.cc security_frontend.cc diff --git a/src/v/cluster/fwd.h b/src/v/cluster/fwd.h index 1eb14139b635..38fff6c51ddd 100644 --- a/src/v/cluster/fwd.h +++ b/src/v/cluster/fwd.h @@ -19,6 +19,7 @@ class controller_stm; class controller_stm_shard; class id_allocator_frontend; class rm_partition_frontend; +class log_eviction_stm; class tx_registry_frontend; class tx_coordinator_mapper; class tm_stm_cache; diff --git a/src/v/cluster/log_eviction_stm.cc b/src/v/cluster/log_eviction_stm.cc new file mode 100644 index 000000000000..cc2b0c85849f --- /dev/null +++ b/src/v/cluster/log_eviction_stm.cc @@ -0,0 +1,364 @@ +// Copyright 2020 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/log_eviction_stm.h" + +#include "bytes/iostream.h" +#include "raft/consensus.h" +#include "raft/types.h" +#include "utils/gate_guard.h" + +#include + +namespace cluster { + +struct snapshot_data + : serde:: + envelope, serde::compat_version<0>> { + model::offset effective_start_offset{}; + + auto serde_fields() { return std::tie(effective_start_offset); } + + friend std::ostream& operator<<(std::ostream& os, const snapshot_data& d) { + fmt::print( + os, "{{ effective_start_offset: {} }}", d.effective_start_offset); + return os; + } +}; + +log_eviction_stm::log_eviction_stm( + raft::consensus* raft, + ss::logger& logger, + ss::abort_source& as, + storage::kvstore& kvstore) + : persisted_stm("log_eviction_stm.snapshot", logger, raft, kvstore) + , _logger(logger) + , _as(as) {} + +ss::future<> log_eviction_stm::start() { + ssx::spawn_with_gate(_gate, [this] { return monitor_log_eviction(); }); + ssx::spawn_with_gate( + _gate, [this] { return write_raft_snapshots_in_background(); }); + return persisted_stm::start(); +} + +ss::future<> log_eviction_stm::stop() { + _reap_condition.broken(); + co_await persisted_stm::stop(); +} + +ss::future<> log_eviction_stm::write_raft_snapshots_in_background() { + /// This method is executed as a background fiber and it attempts to write + /// snapshots as close to effective_start_offset as possible. + auto gh = _gate.hold(); + while (!_as.abort_requested()) { + try { + /// This background fiber can be woken-up via apply() when special + /// batches are processed or by the storage layer when local + /// eviction is triggered. + co_await _reap_condition.wait(); + } catch (const ss::broken_condition_variable&) { + /// stop() has been called + break; + } + auto evict_until = std::max( + _delete_records_eviction_offset, _storage_eviction_offset); + if (evict_until > model::offset{}) { + auto index_lb = _raft->log().index_lower_bound(evict_until); + if (index_lb) { + vassert( + index_lb <= evict_until, + "Calculated boundary {} must be <= effective_start {} ", + index_lb, + evict_until); + try { + co_await do_write_raft_snapshot(*index_lb); + } catch (const std::exception& e) { + vlog( + _logger.error, + "Error occurred when attempting to write snapshot: " + "{}, ntp: {}", + e, + _raft->ntp()); + } + } + } + } +} + +ss::future<> log_eviction_stm::monitor_log_eviction() { + /// This method is executed as a background fiber and is listening for + /// eviction events from the storage layer. These events will trigger a + /// write snapshot, and the log will be prefix truncated. + auto gh = _gate.hold(); + while (!_as.abort_requested()) { + try { + _storage_eviction_offset = co_await _raft->monitor_log_eviction( + _as); + vlog( + _logger.trace, + "Handling log deletion notification for offset: {}, ntp: {}", + _storage_eviction_offset, + _raft->ntp()); + const auto max_collectible_offset + = _raft->log().stm_manager()->max_collectible_offset(); + const auto next_eviction_offset = std::min( + max_collectible_offset, _storage_eviction_offset); + _reap_condition.signal(); + /// Do not attempt to process another eviction event from storage + /// until the current has completed fully + co_await _last_snapshot_monitor.wait( + next_eviction_offset, model::no_timeout, _as); + } catch (const ss::abort_requested_exception&) { + // ignore abort requested exception, shutting down + } catch (const ss::gate_closed_exception&) { + // ignore gate closed exception, shutting down + } catch (const std::exception& e) { + vlog( + _logger.info, + "Error handling log eviction - {}, ntp: {}", + e, + _raft->ntp()); + } + } +} + +ss::future<> log_eviction_stm::do_write_raft_snapshot(model::offset index_lb) { + if (index_lb <= _raft->last_snapshot_index()) { + co_return; + } + co_await _raft->visible_offset_monitor().wait( + index_lb, model::no_timeout, _as); + co_await _raft->refresh_commit_index(); + co_await _raft->log().stm_manager()->ensure_snapshot_exists(index_lb); + const auto max_collectible_offset + = _raft->log().stm_manager()->max_collectible_offset(); + if (index_lb > max_collectible_offset) { + vlog( + _logger.trace, + "Can only evict up to offset: {}, ntp: {}", + max_collectible_offset, + _raft->ntp()); + index_lb = max_collectible_offset; + } + co_await _raft->write_snapshot(raft::write_snapshot_cfg(index_lb, iobuf())); + _last_snapshot_monitor.notify(index_lb); +} + +ss::future> +log_eviction_stm::sync_effective_start(model::timeout_clock::duration timeout) { + /// Call this method to ensure followers have processed up until the + /// most recent known version of the special batch. This is particularly + /// useful to know if the start offset is up to date in the case + /// leadership has recently changed for example. + auto term = _raft->term(); + if (!co_await sync(timeout)) { + if (term != _raft->term()) { + co_return errc::not_leader; + } else { + co_return errc::timeout; + } + } + co_return effective_start_offset(); +} + +model::offset log_eviction_stm::effective_start_offset() const { + /// The start offset is the max of either the last snapshot index or the + /// most recent delete records eviciton offset. This is because even after + /// bootstrap the larger of the two will reflect the most recent event that + /// has occurred, and will be the correct start offset. + /// + /// NOTE: Cannot replace last_snapshot_index with _storage_eviction_offset + /// as this is the requested eviction offset and its not persisted anywhere. + /// In the event this is set but a crash occurred before write_snapshot was + /// called (occurs in background) it would appear that the start offset was + /// incremented then returned to a previous value. + return model::next_offset( + std::max(_raft->last_snapshot_index(), _delete_records_eviction_offset)); +} + +ss::future log_eviction_stm::truncate( + model::offset rp_truncate_offset, + ss::lowres_clock::time_point deadline, + std::optional> as) { + /// Create the special prefix_truncate batch, it is a model::record_batch + /// with exactly one record within it, the point at which to truncate + storage::record_batch_builder builder( + model::record_batch_type::prefix_truncate, model::offset(0)); + /// Everything below the requested offset should be truncated, requested + /// offset itself will be the new low_watermark (readable) + auto key = serde::to_iobuf(rp_truncate_offset - model::offset{1}); + builder.add_raw_kv(std::move(key), iobuf()); + auto batch = std::move(builder).build(); + + /// After command replication all that can be guaranteed is that the command + /// was replicated + vlog( + _logger.info, + "Replicating prefix_truncate command, truncate_offset: {} current " + "start offset: {}, current last snapshot offset: {}, current last " + "visible " + "offset: {}", + rp_truncate_offset, + effective_start_offset(), + _raft->last_snapshot_index(), + _raft->last_visible_index()); + + auto ec = co_await replicate_command(std::move(batch), deadline, as); + if (ec) { + vlog( + _logger.error, + "Failed to observe replicated command in log, reason: {}", + ec.message()); + co_return ec; + } + co_return errc::success; +} + +ss::future log_eviction_stm::replicate_command( + model::record_batch batch, + ss::lowres_clock::time_point deadline, + std::optional> as) { + auto fut = _raft->replicate( + _raft->term(), + model::make_memory_record_batch_reader(std::move(batch)), + raft::replicate_options{raft::consistency_level::quorum_ack}); + + /// Execute the replicate command bound by timeout and cancellable via + /// abort_source mechanism + result result{{}}; + try { + if (as) { + result = co_await ssx::with_timeout_abortable( + std::move(fut), deadline, *as); + } else { + result = co_await ss::with_timeout(deadline, std::move(fut)); + } + } catch (const ss::timed_out_error&) { + result = errc::timeout; + } + + if (!result) { + vlog( + _logger.error, + "Failed to replicate prefix_truncate command, reason: {}", + result.error()); + co_return result.error(); + } + + /// The following will return when the command replicated above has been + /// processed by the apply() method. This effectively bumps the start offset + /// to the requested value and since apply is deterministic this is + /// guaranteed to occur. No guarantees of data removal / availability can be + /// made at or after this point, since that occurs in a background fiber. + auto applied = co_await wait_no_throw( + result.value().last_offset, deadline, as); + if (!applied) { + if (as && as->get().abort_requested()) { + co_return errc::shutting_down; + } + co_return errc::timeout; + } + co_return errc::success; +} + +ss::future<> log_eviction_stm::apply(model::record_batch batch) { + /// The work done within apply() must be deterministic that way the start + /// offset will always be the same value across all replicas + /// + /// Here all apply() does is move forward the in memory start offset, a + /// background fiber is responsible for evicting as much as possible + if (unlikely( + batch.header().type == model::record_batch_type::prefix_truncate)) { + /// record_batches of type ::prefix_truncate are always of size 1 + const auto truncate_offset = serde::from_iobuf( + batch.copy_records().begin()->release_key()); + if (truncate_offset > _delete_records_eviction_offset) { + vlog( + _logger.debug, + "Moving effective start offset to " + "truncate_point: {} last_applied: {} ntp: {}", + truncate_offset, + last_applied_offset(), + _raft->ntp()); + + /// Set the new in memory start offset + _delete_records_eviction_offset = truncate_offset; + /// Wake up the background reaping thread + _reap_condition.signal(); + /// Writing a local snapshot is just an optimization, delete-records + /// is infrequently called and theres no better time to persist the + /// fact that a new start offset has been written to disk + co_await make_snapshot(); + } + } +} + +ss::future<> log_eviction_stm::handle_eviction() { + /// In the case there is a gap detected in the log, the only path + /// forward is to read the raft snapshot and begin processing from the + /// raft last_snapshot_index + auto raft_snapshot = co_await _raft->open_snapshot(); + if (!raft_snapshot) { + throw std::runtime_error{fmt_with_ctx( + fmt::format, + "encountered a gap in the raft log (last_applied: {}, log start " + "offset: {}), but can't find the snapshot - ntp: {}", + last_applied_offset(), + _raft->start_offset(), + _raft->ntp())}; + } + + auto last_snapshot_index = raft_snapshot->metadata.last_included_index; + co_await raft_snapshot->close(); + _delete_records_eviction_offset = model::offset{}; + _storage_eviction_offset = last_snapshot_index; + set_next(model::next_offset(last_snapshot_index)); + vlog( + _logger.info, + "Handled log eviction new effective start offset: {} for ntp: {}", + effective_start_offset(), + _c->ntp()); +} + +ss::future<> +log_eviction_stm::apply_snapshot(stm_snapshot_header header, iobuf&& data) { + auto snapshot = serde::from_iobuf(std::move(data)); + vlog( + _logger.info, + "Applying snapshot {} at offset: {} for ntp: {}", + snapshot, + header.offset, + _raft->ntp()); + + _delete_records_eviction_offset = snapshot.effective_start_offset; + _last_snapshot_offset = header.offset; + _insync_offset = header.offset; + return ss::now(); +} + +ss::future log_eviction_stm::take_snapshot() { + vlog( + _logger.trace, + "Taking snapshot at offset: {} for ntp: {}", + last_applied_offset(), + _raft->ntp()); + iobuf snap_data = serde::to_iobuf( + snapshot_data{.effective_start_offset = _delete_records_eviction_offset}); + co_return stm_snapshot::create( + 0, last_applied_offset(), std::move(snap_data)); +} + +ss::future<> log_eviction_stm::ensure_snapshot_exists(model::offset) { + /// This class drives eviction, therefore it cannot wait until its own + /// snapshot exists until writing a snapshot + return ss::now(); +} + +} // namespace cluster diff --git a/src/v/cluster/log_eviction_stm.h b/src/v/cluster/log_eviction_stm.h new file mode 100644 index 000000000000..7de3f5f9e813 --- /dev/null +++ b/src/v/cluster/log_eviction_stm.h @@ -0,0 +1,123 @@ +/* + * Copyright 2020 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once +#include "cluster/persisted_stm.h" +#include "config/configuration.h" +#include "model/fundamental.h" +#include "raft/fwd.h" +#include "seastarx.h" +#include "storage/types.h" +#include "utils/mutex.h" + +#include +#include +#include + +namespace cluster { + +class consensus; + +/** + * Responsible for taking snapshots triggered by underlying log segments + * eviction. + * + * The process goes like this: storage layer will send a "deletion notification" + * - a request to evict log up to a certain offset. log_eviction_stm will then + * adjust that offset with _stm_manager->max_collectible_offset(), write the + * raft snapshot and notify the storage layer that log eviction can safely + * proceed up to the adjusted offset. + * + * This class also initiates and responds to delete-records events. Call + * truncate() pushes a special prefix_truncate batch onto the log for which this + * stm will be searching for. Upon processing of this record a new snapshot will + * be written which may also trigger deletion of data on disk. + */ +class log_eviction_stm final + : public persisted_stm { +public: + log_eviction_stm( + raft::consensus*, ss::logger&, ss::abort_source&, storage::kvstore&); + + ss::future<> start() override; + + ss::future<> stop() override; + + /// Truncate local log + /// + /// This method doesn't immediately delete the entries of the log below the + /// requested offset but pushes a special record batch onto the log which + /// when read by brokers, will invoke a routine that will perform the + /// deletion + ss::future truncate( + model::offset kafka_offset, + ss::lowres_clock::time_point deadline, + std::optional> as + = std::nullopt); + + /// Return the offset up to which the storage layer would like to + /// prefix truncate the log, if any. + std::optional eviction_requested_offset() const { + const auto requested_eviction_offset = std::max( + _delete_records_eviction_offset, _storage_eviction_offset); + if (!requested_eviction_offset) { + return std::nullopt; + } + return requested_eviction_offset; + } + + /// This class drives eviction, it works differently then other stms in this + /// regard + /// + /// Override to ensure it never unnecessarily waits + ss::future<> ensure_snapshot_exists(model::offset) override; + + /// The actual start offset of the log with the delta factored in + model::offset effective_start_offset() const; + + /// Ensure followers have processed up until the most recent known version + /// of the batch representing the start offset + ss::future> + sync_effective_start(model::timeout_clock::duration timeout); + +protected: + ss::future<> apply_snapshot(stm_snapshot_header, iobuf&&) override; + + ss::future take_snapshot() override; + +private: + void increment_start_offset(model::offset); + bool should_process_evict(model::offset); + + ss::future<> monitor_log_eviction(); + ss::future<> do_write_raft_snapshot(model::offset); + ss::future<> write_raft_snapshots_in_background(); + ss::future<> apply(model::record_batch) override; + ss::future<> handle_eviction() override; + + ss::future replicate_command( + model::record_batch batch, + ss::lowres_clock::time_point deadline, + std::optional> as); + +private: + ss::logger& _logger; + ss::abort_source& _as; + model::offset _storage_eviction_offset; + model::offset _delete_records_eviction_offset; + + /// Signaled when a snapshot should be taken, and data deleted + ss::condition_variable _reap_condition; + /// To maintain backpressure on snapshot writes from storage + raft::offset_monitor _last_snapshot_monitor; +}; + +} // namespace cluster diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 0c1f3df16bec..e3a6957f30a6 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -48,6 +48,7 @@ partition::partition( ss::sharded& feature_table, ss::sharded& tm_stm_cache_manager, ss::sharded& upload_hks, + storage::kvstore& kvstore, config::binding max_concurrent_producer_ids, std::optional read_replica_bucket) : _raft(r) @@ -65,16 +66,20 @@ partition::partition( , _cloud_storage_cache(cloud_storage_cache) , _cloud_storage_probe( ss::make_shared(_raft->ntp())) - , _upload_housekeeping(upload_hks) { + , _upload_housekeeping(upload_hks) + , _kvstore(kvstore) { auto stm_manager = _raft->log().stm_manager(); if (is_id_allocator_topic(_raft->ntp())) { _id_allocator_stm = ss::make_shared( clusterlog, _raft.get()); } else if (is_tx_manager_topic(_raft->ntp())) { - if (_raft->log_config().is_collectable()) { - _log_eviction_stm = ss::make_lw_shared( - _raft.get(), clusterlog, stm_manager, _as); + if ( + _raft->log_config().is_collectable() + && !storage::deletion_exempt(_raft->ntp())) { + _log_eviction_stm = ss::make_shared( + _raft.get(), clusterlog, _as, _kvstore); + stm_manager->add_stm(_log_eviction_stm); } if (_is_tx_enabled) { @@ -85,9 +90,12 @@ partition::partition( stm_manager->add_stm(_tm_stm); } } else { - if (_raft->log_config().is_collectable()) { - _log_eviction_stm = ss::make_lw_shared( - _raft.get(), clusterlog, stm_manager, _as); + if ( + _raft->log_config().is_collectable() + && !storage::deletion_exempt(_raft->ntp())) { + _log_eviction_stm = ss::make_shared( + _raft.get(), clusterlog, _as, _kvstore); + stm_manager->add_stm(_log_eviction_stm); } const model::topic_namespace tp_ns( _raft->ntp().ns, _raft->ntp().tp.topic); @@ -168,6 +176,36 @@ partition::partition( partition::~partition() {} +ss::future partition::prefix_truncate( + model::offset truncation_offset, ss::lowres_clock::time_point deadline) { + if (!_log_eviction_stm) { + vlog( + clusterlog.info, + "Cannot prefix-truncate topic/partition {} retention settings not " + "applied", + _raft->ntp()); + co_return make_error_code(errc::topic_invalid_config); + } + if (_archival_meta_stm) { + vlog( + clusterlog.info, + "Cannot prefix-truncate topic/partition {} cloud settings are " + "applied", + _raft->ntp()); + co_return make_error_code(errc::topic_invalid_config); + } + if (!feature_table().local().is_active(features::feature::delete_records)) { + vlog( + clusterlog.info, + "Cannot prefix-truncate topic/partition {} feature is currently " + "disabled", + _raft->ntp()); + co_return make_error_code(cluster::errc::feature_disabled); + } + co_return co_await _log_eviction_stm->truncate( + truncation_offset, deadline, _as); +} + ss::future> partition::aborted_transactions_cloud( const cloud_storage::offset_range& offsets) { return _cloud_storage_partition->aborted_transactions(offsets); @@ -865,6 +903,9 @@ ss::future<> partition::remove_persistent_state() { if (_id_allocator_stm) { co_await _id_allocator_stm->remove_persistent_state(); } + if (_log_eviction_stm) { + co_await _log_eviction_stm->remove_persistent_state(); + } } /** diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 7ffe58af1005..b99e14226b5f 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -15,6 +15,7 @@ #include "cloud_storage/fwd.h" #include "cluster/archival_metadata_stm.h" #include "cluster/id_allocator_stm.h" +#include "cluster/log_eviction_stm.h" #include "cluster/partition_probe.h" #include "cluster/rm_stm.h" #include "cluster/tm_stm.h" @@ -29,7 +30,6 @@ #include "raft/consensus.h" #include "raft/consensus_utils.h" #include "raft/group_configuration.h" -#include "raft/log_eviction_stm.h" #include "raft/types.h" #include "storage/translating_reader.h" #include "storage/types.h" @@ -53,6 +53,7 @@ class partition { ss::sharded&, ss::sharded&, ss::sharded&, + storage::kvstore&, config::binding, std::optional read_replica_bucket = std::nullopt); @@ -70,6 +71,11 @@ class partition { ss::future> replicate(model::record_batch_reader&&, raft::replicate_options); + /// Truncate the beginning of the log up until a given offset + /// Can only be performed on logs that are deletable and non internal + ss::future + prefix_truncate(model::offset o, ss::lowres_clock::time_point deadline); + kafka_stages replicate_in_stages( model::batch_identity, model::record_batch_reader&&, @@ -86,7 +92,20 @@ class partition { return _raft->make_reader(std::move(config), deadline); } - model::offset start_offset() const { return _raft->start_offset(); } + ss::future> + sync_effective_start(model::timeout_clock::duration timeout) { + if (_log_eviction_stm) { + co_return co_await _log_eviction_stm->sync_effective_start(timeout); + } + co_return start_offset(); + } + + model::offset start_offset() const { + if (_log_eviction_stm) { + return _log_eviction_stm->effective_start_offset(); + } + return _raft->start_offset(); + } /** * The returned value of last committed offset should not be used to @@ -375,7 +394,7 @@ class partition { consensus_ptr _raft; ss::shared_ptr _partition_mem_tracker; - ss::lw_shared_ptr _log_eviction_stm; + ss::shared_ptr _log_eviction_stm; ss::shared_ptr _id_allocator_stm; ss::shared_ptr _rm_stm; ss::shared_ptr _tm_stm; @@ -405,6 +424,8 @@ class partition { ss::sharded& _upload_housekeeping; + storage::kvstore& _kvstore; + friend std::ostream& operator<<(std::ostream& o, const partition& x); }; } // namespace cluster diff --git a/src/v/cluster/partition_manager.cc b/src/v/cluster/partition_manager.cc index b176ab4d6954..94a506e76ec0 100644 --- a/src/v/cluster/partition_manager.cc +++ b/src/v/cluster/partition_manager.cc @@ -235,6 +235,7 @@ ss::future partition_manager::manage( _feature_table, _tm_stm_cache_manager, _upload_hks, + _storage.kvs(), _max_concurrent_producer_ids, read_replica_bucket); diff --git a/src/v/cluster/persisted_stm.h b/src/v/cluster/persisted_stm.h index 7ee41c3f63d9..5f59094b41b3 100644 --- a/src/v/cluster/persisted_stm.h +++ b/src/v/cluster/persisted_stm.h @@ -157,7 +157,7 @@ class persisted_stm ss::sstring, ss::logger&, raft::consensus*, Args&&...); void make_snapshot_in_background() final; - ss::future<> ensure_snapshot_exists(model::offset) final; + ss::future<> ensure_snapshot_exists(model::offset) override; model::offset max_collectible_offset() override; ss::future> aborted_tx_ranges(model::offset, model::offset) override; diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 6766fd007ec3..c65b1c77e5aa 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -2761,6 +2761,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { _log_state.lru_idempotent_pids.push_back(it->second); } + _bootstrap_committed_offset = data.offset; _last_snapshot_offset = data.offset; _insync_offset = data.offset; } diff --git a/src/v/cluster/tests/CMakeLists.txt b/src/v/cluster/tests/CMakeLists.txt index 2f3e04e08da8..0c1649f5cd21 100644 --- a/src/v/cluster/tests/CMakeLists.txt +++ b/src/v/cluster/tests/CMakeLists.txt @@ -45,6 +45,7 @@ rp_test( endforeach() set(srcs + manual_log_deletion_test.cc cluster_tests.cc configuration_change_test.cc autocreate_test.cc diff --git a/src/v/raft/tests/manual_log_deletion_test.cc b/src/v/cluster/tests/manual_log_deletion_test.cc similarity index 68% rename from src/v/raft/tests/manual_log_deletion_test.cc rename to src/v/cluster/tests/manual_log_deletion_test.cc index ae427243640f..af1f4a5bad62 100644 --- a/src/v/raft/tests/manual_log_deletion_test.cc +++ b/src/v/cluster/tests/manual_log_deletion_test.cc @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "cluster/log_eviction_stm.h" #include "config/configuration.h" #include "finjector/hbadger.h" #include "model/fundamental.h" @@ -35,6 +36,49 @@ struct manual_deletion_fixture : public raft_test_fixture { config::shard_local_cfg().log_segment_size_min.set_value( std::optional()); gr.enable_all(); + auto& members = gr.get_members(); + for (auto& [id, member] : members) { + maybe_init_eviction_stm(id); + } + } + + virtual ~manual_deletion_fixture() { + std::vector to_delete; + for (auto& [id, _] : gr.get_members()) { + to_delete.push_back(id); + } + for (auto id : to_delete) { + auto& member = gr.get_member(id); + if (member.started) { + gr.disable_node(id); + } + } + } + + void maybe_init_eviction_stm(model::node_id id) { + auto& member = gr.get_member(id); + if (member.log->config().is_collectable()) { + auto& kvstore = member.storage.local().kvs(); + auto eviction_stm = std::make_unique( + member.consensus.get(), tstlog, member._as, kvstore); + eviction_stm->start().get0(); + eviction_stms.emplace(id, std::move(eviction_stm)); + member.kill_eviction_stm_cb + = std::make_unique()>>( + [this, id = id]() { + tstlog.info("Stopping eviction stm: {}", id); + auto found = eviction_stms.find(id); + if (found != eviction_stms.end()) { + if (found->second != nullptr) { + return found->second->stop().then([this, id]() { + tstlog.info("eviction stm stopped: {}", id); + eviction_stms.erase(id); + }); + } + } + return ss::now(); + }); + } } void prepare_raft_group() { @@ -84,6 +128,7 @@ struct manual_deletion_fixture : public raft_test_fixture { to_delete.push_back(std::filesystem::path( gr.get_member(id).log->config().topic_directory())); gr.disable_node(id); + tstlog.info("node disabled: {}", id); } for (auto& path : to_delete) { std::filesystem::remove_all(path); @@ -91,6 +136,7 @@ struct manual_deletion_fixture : public raft_test_fixture { // enable back for (auto id : nodes) { gr.enable_node(id); + maybe_init_eviction_stm(id); } } @@ -102,9 +148,12 @@ struct manual_deletion_fixture : public raft_test_fixture { remove_data(nodes); } + using stm_map_t = std:: + unordered_map>; raft_group gr; model::timestamp retention_timestamp; ss::abort_source as; + stm_map_t eviction_stms; }; FIXTURE_TEST( diff --git a/src/v/features/feature_table.cc b/src/v/features/feature_table.cc index 239235b4d3bb..3008875db962 100644 --- a/src/v/features/feature_table.cc +++ b/src/v/features/feature_table.cc @@ -71,6 +71,8 @@ std::string_view to_string_view(feature f) { return "force_partition_reconfiguration"; case feature::raft_append_entries_serde: return "raft_append_entries_serde"; + case feature::delete_records: + return "delete_records"; /* * testing features diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index 490835857a88..49a1ccdec684 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -60,6 +60,7 @@ enum class feature : std::uint64_t { transaction_partitioning = 1ULL << 25U, force_partition_reconfiguration = 1ULL << 26U, raft_append_entries_serde = 1ULL << 28U, + delete_records = 1ULL << 29U, // Dummy features for testing only test_alpha = 1ULL << 61U, @@ -272,6 +273,12 @@ constexpr static std::array feature_schema{ feature::raft_append_entries_serde, feature_spec::available_policy::always, feature_spec::prepare_policy::always}, + feature_spec{ + cluster::cluster_version{10}, + "delete_records", + feature::delete_records, + feature_spec::available_policy::always, + feature_spec::prepare_policy::always}, }; std::string_view to_string_view(feature); diff --git a/src/v/kafka/CMakeLists.txt b/src/v/kafka/CMakeLists.txt index 916b4b3193b0..3b9489543ea8 100644 --- a/src/v/kafka/CMakeLists.txt +++ b/src/v/kafka/CMakeLists.txt @@ -22,6 +22,7 @@ set(handlers_srcs server/handlers/handler_interface.cc server/handlers/topics/types.cc server/handlers/topics/topic_utils.cc + server/handlers/delete_records.cc server/handlers/describe_producers.cc server/handlers/describe_transactions.cc server/handlers/handler_probe.cc diff --git a/src/v/kafka/protocol/delete_records.h b/src/v/kafka/protocol/delete_records.h new file mode 100644 index 000000000000..2f3431cd43e8 --- /dev/null +++ b/src/v/kafka/protocol/delete_records.h @@ -0,0 +1,58 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "kafka/protocol/schemata/delete_records_request.h" +#include "kafka/protocol/schemata/delete_records_response.h" +#include "kafka/types.h" + +namespace kafka { + +struct delete_records_request final { + using api_type = delete_records_api; + + delete_records_request_data data; + + void encode(protocol::encoder& writer, api_version version) { + data.encode(writer, version); + } + + void decode(protocol::decoder& reader, api_version version) { + data.decode(reader, version); + } + + friend std::ostream& + operator<<(std::ostream& os, const delete_records_request& r) { + return os << r.data; + } +}; + +struct delete_records_response final { + using api_type = delete_records_api; + + delete_records_response_data data; + + void encode(protocol::encoder& writer, api_version version) { + data.encode(writer, version); + } + + void decode(iobuf buf, api_version version) { + data.decode(std::move(buf), version); + } + + friend std::ostream& + operator<<(std::ostream& os, const delete_records_response& r) { + return os << r.data; + } +}; + +} // namespace kafka diff --git a/src/v/kafka/protocol/schemata/CMakeLists.txt b/src/v/kafka/protocol/schemata/CMakeLists.txt index a9f78e901710..03fffdbdecc2 100644 --- a/src/v/kafka/protocol/schemata/CMakeLists.txt +++ b/src/v/kafka/protocol/schemata/CMakeLists.txt @@ -73,6 +73,8 @@ set(schemata add_offsets_to_txn_response.json offset_delete_request.json offset_delete_response.json + delete_records_request.json + delete_records_response.json offset_for_leader_epoch_request.json offset_for_leader_epoch_response.json alter_partition_reassignments_request.json diff --git a/src/v/kafka/protocol/schemata/delete_records_request.json b/src/v/kafka/protocol/schemata/delete_records_request.json new file mode 100644 index 000000000000..06a12d85c8bb --- /dev/null +++ b/src/v/kafka/protocol/schemata/delete_records_request.json @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 21, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "DeleteRecordsRequest", + // Version 1 is the same as version 0. + + // Version 2 is the first flexible version. + "validVersions": "0-2", + "flexibleVersions": "2+", + "fields": [ + { "name": "Topics", "type": "[]DeleteRecordsTopic", "versions": "0+", + "about": "Each topic that we want to delete records from.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]DeleteRecordsPartition", "versions": "0+", + "about": "Each partition that we want to delete records from.", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "Offset", "type": "int64", "versions": "0+", + "about": "The deletion offset." } + ]} + ]}, + { "name": "TimeoutMs", "type": "int32", "versions": "0+", + "about": "How long to wait for the deletion to complete, in milliseconds." } + ] +} diff --git a/src/v/kafka/protocol/schemata/delete_records_response.json b/src/v/kafka/protocol/schemata/delete_records_response.json new file mode 100644 index 000000000000..bfc0a5639087 --- /dev/null +++ b/src/v/kafka/protocol/schemata/delete_records_response.json @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 21, + "type": "response", + "name": "DeleteRecordsResponse", + // Starting in version 1, on quota violation, brokers send out responses before throttling. + + // Version 2 is the first flexible version. + "validVersions": "0-2", + "flexibleVersions": "2+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Topics", "type": "[]DeleteRecordsTopicResult", "versions": "0+", + "about": "Each topic that we wanted to delete records from.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]DeleteRecordsPartitionResult", "versions": "0+", + "about": "Each partition that we wanted to delete records from.", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true, + "about": "The partition index." }, + { "name": "LowWatermark", "type": "int64", "versions": "0+", + "about": "The partition low water mark." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The deletion error code, or 0 if the deletion succeeded." } + ]} + ]} + ] +} diff --git a/src/v/kafka/protocol/schemata/generator.py b/src/v/kafka/protocol/schemata/generator.py index 2962757ec4e9..cd08ebf2d916 100755 --- a/src/v/kafka/protocol/schemata/generator.py +++ b/src/v/kafka/protocol/schemata/generator.py @@ -309,6 +309,25 @@ } } }, + "DeleteRecordsRequestData": { + "Topics": { + "Partitions": { + "PartitionIndex": ("model::partition_id", "int32"), + "Offset": ("model::offset", "int64"), + } + }, + "TimeoutMs": ("std::chrono::milliseconds", "int32") + }, + "DeleteRecordsResponseData": { + "ThrottleTimeMs": ("std::chrono::milliseconds", "int32"), + "Topics": { + "Partitions": { + "PartitionIndex": ("model::partition_id", "int32"), + "LowWatermark": ("model::offset", "int64"), + "ErrorCode": ("kafka::error_code", "int16") + } + } + }, "AlterPartitionReassignmentsRequestData": { "TimeoutMs": ("std::chrono::milliseconds", "int32"), "Topics": { @@ -555,6 +574,10 @@ def make_context_field(path): "DescribeTransactionState", "TopicData", "ListTransactionState", + "DeleteRecordsTopic", + "DeleteRecordsPartition", + "DeleteRecordsTopicResult", + "DeleteRecordsPartitionResult", ] # a list of struct types which are ineligible to have default-generated diff --git a/src/v/kafka/server/handlers/delete_records.cc b/src/v/kafka/server/handlers/delete_records.cc new file mode 100644 index 000000000000..0fb70501542a --- /dev/null +++ b/src/v/kafka/server/handlers/delete_records.cc @@ -0,0 +1,236 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "kafka/server/handlers/delete_records.h" + +#include "cluster/metadata_cache.h" +#include "cluster/partition_manager.h" +#include "cluster/shard_table.h" +#include "kafka/server/partition_proxy.h" +#include "model/fundamental.h" +#include "model/ktp.h" + +#include + +namespace kafka { + +namespace { + +/// Returned in responses where kafka::error_code is anything else then a value +/// of error_code::none +constexpr auto invalid_low_watermark = model::offset(-1); + +/// Compare against user provided value of truncation offset, this value will +/// indicate to truncate at the current partition high watermark +constexpr auto at_current_high_watermark = model::offset(-1); + +std::vector +make_partition_errors(const delete_records_topic& t, error_code ec) { + std::vector r; + for (const auto& p : t.partitions) { + r.push_back(delete_records_partition_result{ + .partition_index = p.partition_index, + .low_watermark = invalid_low_watermark, + .error_code = ec}); + } + return r; +} + +/// Performs validation of topics, any failures will result in a list of +/// partitions that all contain the identical error codes +std::vector +validate_at_topic_level(request_context& ctx, const delete_records_topic& t) { + const auto is_authorized = [&ctx](const delete_records_topic& t) { + return ctx.authorized(security::acl_operation::remove, t.name); + }; + const auto is_deletable = [](const cluster::topic_configuration& cfg) { + /// Immitates the logic in ntp_config::is_collectible + if ( + !cfg.properties.has_overrides() + || !cfg.properties.cleanup_policy_bitflags) { + return true; + } + const auto& bitflags = cfg.properties.cleanup_policy_bitflags; + return (*bitflags & model::cleanup_policy_bitflags::deletion) + == model::cleanup_policy_bitflags::deletion; + }; + const auto is_cloud_enabled = [](const cluster::topic_configuration& cfg) { + const auto& si_flags = cfg.properties.shadow_indexing; + return si_flags && *si_flags != model::shadow_indexing_mode::disabled; + }; + const auto is_nodelete_topic = [](const delete_records_topic& t) { + const auto& nodelete_topics + = config::shard_local_cfg().kafka_nodelete_topics(); + return std::find_if( + nodelete_topics.begin(), + nodelete_topics.end(), + [t](const ss::sstring& name) { return name == t.name; }) + != nodelete_topics.end(); + }; + + const auto cfg = ctx.metadata_cache().get_topic_cfg( + model::topic_namespace_view(model::kafka_namespace, t.name)); + if (!cfg) { + return make_partition_errors(t, error_code::unknown_topic_or_partition); + } + if (!is_authorized(t)) { + return make_partition_errors(t, error_code::topic_authorization_failed); + } else if (!is_deletable(*cfg)) { + return make_partition_errors(t, error_code::policy_violation); + } else if (is_cloud_enabled(*cfg) || is_nodelete_topic(t)) { + return make_partition_errors(t, error_code::invalid_topic_exception); + } + return {}; +} + +/// Result set includes topic for later group-by topic logic +using result_t = std::tuple; + +result_t make_partition_error(const model::ktp& ktp, error_code err) { + return std::make_tuple( + ktp.get_topic(), + delete_records_partition_result{ + .partition_index = ktp.get_partition(), + .low_watermark = invalid_low_watermark, + .error_code = err}); +} + +result_t +make_partition_response(const model::ktp& ktp, model::offset low_watermark) { + return std::make_tuple( + ktp.get_topic(), + delete_records_partition_result{ + .partition_index = ktp.get_partition(), + .low_watermark = low_watermark, + .error_code = error_code::none}); +} + +/// If validation passes, attempts to prefix truncate the raft log at the given +/// offset. Returns a response that includes the new low watermark +ss::future prefix_truncate( + cluster::partition_manager& pm, + model::ktp ktp, + model::offset kafka_truncation_offset, + std::chrono::milliseconds timeout_ms) { + auto raw_partition = pm.get(ktp); + if (!raw_partition) { + co_return make_partition_error( + ktp, error_code::unknown_topic_or_partition); + } + auto partition = make_partition_proxy(ktp, pm); + if (!partition->is_leader()) { + co_return make_partition_error( + ktp, error_code::not_leader_for_partition); + } + if (kafka_truncation_offset == at_current_high_watermark) { + /// User is requesting to truncate all data + kafka_truncation_offset = partition->high_watermark(); + } + if (kafka_truncation_offset < model::offset(0)) { + co_return make_partition_error(ktp, error_code::offset_out_of_range); + } + + /// Perform truncation at the requested offset. A special batch will be + /// written to the log, eventually consumed by replicas via the + /// new_log_eviction_stm, which will perform a prefix truncation at the + /// given offset + auto errc = co_await partition->prefix_truncate( + kafka_truncation_offset, ss::lowres_clock::now() + timeout_ms); + if (errc != error_code::none) { + vlog( + klog.info, + "Possible failed attempted to truncate partition: {} error: {}", + ktp, + errc); + co_return make_partition_error(ktp, errc); + } + /// Its ok to not call sync_start_offset() over start_offset() here because + /// prefix_truncate() was called on the leader (this node) and it waits + /// until the local start offset was updated + const auto kafka_start_offset = partition->start_offset(); + vlog( + klog.info, + "Truncated partition: {} to offset: {}", + ktp, + kafka_start_offset); + /// prefix_truncate() will return when the start_offset has been incremented + /// to the desired new low watermark. No other guarantees about the system + /// are made at this point. (i.e. if data on disk on this node or a replica + /// has yet been deleted) + co_return make_partition_response(ktp, kafka_start_offset); +} + +} // namespace + +template<> +ss::future +delete_records_handler::handle(request_context ctx, ss::smp_service_group) { + delete_records_request request; + request.decode(ctx.reader(), ctx.header().version); + log_request(ctx.header(), request); + + delete_records_response response; + std::vector> fs; + for (auto& topic : request.data.topics) { + /// Topic level validation, errors will be all the same for each + /// partition under the topic. Validation for individual partitions may + /// happen in the inner for loop below. + auto topic_level_errors = validate_at_topic_level(ctx, topic); + if (!topic_level_errors.empty()) { + response.data.topics.push_back(delete_records_topic_result{ + .name = topic.name, .partitions = std::move(topic_level_errors)}); + continue; + } + for (auto& partition : topic.partitions) { + auto ktp = model::ktp(topic.name, partition.partition_index); + auto shard = ctx.shards().shard_for(ktp); + if (!shard) { + fs.push_back( + ss::make_ready_future(make_partition_error( + ktp, error_code::unknown_topic_or_partition))); + continue; + } + auto f + = ctx.partition_manager() + .invoke_on( + *shard, + [ktp, + timeout = request.data.timeout_ms, + o = partition.offset](cluster::partition_manager& pm) { + return prefix_truncate(pm, ktp, o, timeout); + }) + .handle_exception([ktp](std::exception_ptr eptr) { + vlog(klog.error, "Caught unexpected exception: {}", eptr); + return make_partition_error( + ktp, error_code::unknown_server_error); + }); + fs.push_back(std::move(f)); + } + } + + /// Perform prefix truncation on partitions + auto results = co_await ss::when_all_succeed(fs.begin(), fs.end()); + + /// Group results by topic + using partition_results = std::vector; + absl::flat_hash_map group_by_topic; + for (auto& [name, partitions] : results) { + group_by_topic[name].push_back(std::move(partitions)); + } + + /// Map to kafka response type + for (auto& [topic, partition_results] : group_by_topic) { + response.data.topics.push_back(delete_records_topic_result{ + .name = topic, .partitions = std::move(partition_results)}); + } + co_return co_await ctx.respond(std::move(response)); +} +} // namespace kafka diff --git a/src/v/kafka/server/handlers/delete_records.h b/src/v/kafka/server/handlers/delete_records.h new file mode 100644 index 000000000000..ba63f22b8ca2 --- /dev/null +++ b/src/v/kafka/server/handlers/delete_records.h @@ -0,0 +1,20 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once +#include "kafka/protocol/delete_records.h" +#include "kafka/server/handlers/handler.h" +#include "kafka/server/response.h" + +namespace kafka { + +using delete_records_handler = single_stage_handler; + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/handlers.h b/src/v/kafka/server/handlers/handlers.h index 1f12e3ae7502..de8e85dfedb4 100644 --- a/src/v/kafka/server/handlers/handlers.h +++ b/src/v/kafka/server/handlers/handlers.h @@ -20,6 +20,7 @@ #include "kafka/server/handlers/create_topics.h" #include "kafka/server/handlers/delete_acls.h" #include "kafka/server/handlers/delete_groups.h" +#include "kafka/server/handlers/delete_records.h" #include "kafka/server/handlers/delete_topics.h" #include "kafka/server/handlers/describe_acls.h" #include "kafka/server/handlers/describe_configs.h" @@ -70,6 +71,7 @@ using request_types = make_request_types< api_versions_handler, join_group_handler, heartbeat_handler, + delete_records_handler, leave_group_handler, sync_group_handler, create_topics_handler, diff --git a/src/v/kafka/server/handlers/list_offsets.cc b/src/v/kafka/server/handlers/list_offsets.cc index ba26643562fb..6891949c0e14 100644 --- a/src/v/kafka/server/handlers/list_offsets.cc +++ b/src/v/kafka/server/handlers/list_offsets.cc @@ -13,6 +13,7 @@ #include "cluster/partition_manager.h" #include "cluster/shard_table.h" #include "kafka/protocol/errors.h" +#include "kafka/server/errors.h" #include "kafka/server/handlers/details/leader_epoch.h" #include "kafka/server/partition_proxy.h" #include "kafka/server/replicated_partition.h" @@ -106,10 +107,18 @@ static ss::future list_offsets_partition( * that the actual timestamp be returned. only the offset is required. */ if (timestamp == list_offsets_request::earliest_timestamp) { + // verify that the leader is up to date so that it is guaranteed to be + // working with the most up to date value of start offset + auto maybe_start_ofs = co_await kafka_partition->sync_effective_start(); + if (!maybe_start_ofs) { + co_return list_offsets_response::make_partition( + ktp.get_partition(), maybe_start_ofs.error()); + } + co_return list_offsets_response::make_partition( ktp.get_partition(), model::timestamp(-1), - kafka_partition->start_offset(), + maybe_start_ofs.value(), kafka_partition->leader_epoch()); } else if (timestamp == list_offsets_request::latest_timestamp) { diff --git a/src/v/kafka/server/partition_proxy.h b/src/v/kafka/server/partition_proxy.h index 5849cebf184b..3b8da0eff46d 100644 --- a/src/v/kafka/server/partition_proxy.h +++ b/src/v/kafka/server/partition_proxy.h @@ -32,6 +32,8 @@ class partition_proxy { public: struct impl { virtual const model::ntp& ntp() const = 0; + virtual ss::future> + sync_effective_start(model::timeout_clock::duration) = 0; virtual model::offset start_offset() const = 0; virtual model::offset high_watermark() const = 0; virtual checked @@ -42,6 +44,8 @@ class partition_proxy { virtual bool is_elected_leader() const = 0; virtual bool is_leader() const = 0; virtual ss::future linearizable_barrier() = 0; + virtual ss::future + prefix_truncate(model::offset, ss::lowres_clock::time_point) = 0; virtual ss::future make_reader( storage::log_reader_config, std::optional) @@ -66,6 +70,11 @@ class partition_proxy { explicit partition_proxy(std::unique_ptr impl) noexcept : _impl(std::move(impl)) {} + ss::future> sync_effective_start( + model::timeout_clock::duration timeout = std::chrono::seconds(5)) { + return _impl->sync_effective_start(timeout); + } + model::offset start_offset() const { return _impl->start_offset(); } model::offset high_watermark() const { return _impl->high_watermark(); } @@ -78,6 +87,11 @@ class partition_proxy { return _impl->linearizable_barrier(); } + ss::future + prefix_truncate(model::offset o, ss::lowres_clock::time_point deadline) { + return _impl->prefix_truncate(o, deadline); + } + bool is_elected_leader() const { return _impl->is_elected_leader(); } bool is_leader() const { return _impl->is_leader(); } diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index e0d6dcdf0bae..8dcc5415856a 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -306,6 +306,48 @@ replicated_partition::get_leader_epoch_last_offset( co_return _translator->from_log_offset(first_local_offset); } +ss::future replicated_partition::prefix_truncate( + model::offset kafka_truncation_offset, + ss::lowres_clock::time_point deadline) { + if ( + kafka_truncation_offset <= start_offset() + || kafka_truncation_offset > high_watermark()) { + co_return error_code::offset_out_of_range; + } + const auto rp_truncate_offset = _translator->to_log_offset( + kafka_truncation_offset); + auto errc = co_await _partition->prefix_truncate( + rp_truncate_offset, deadline); + + /// Translate any std::error_codes into proper kafka error codes + auto kerr = error_code::unknown_server_error; + if (errc.category() == cluster::error_category()) { + switch (cluster::errc(errc.value())) { + case cluster::errc::success: + kerr = error_code::none; + break; + case cluster::errc::timeout: + case cluster::errc::shutting_down: + kerr = error_code::request_timed_out; + break; + case cluster::errc::topic_invalid_config: + case cluster::errc::feature_disabled: + default: + vlog( + klog.error, + "Unhandled cluster::errc encountered: {}", + errc.value()); + kerr = error_code::unknown_server_error; + } + } else { + vlog( + klog.error, + "Unhandled error_category encountered: {}", + errc.category().name()); + } + co_return kerr; +} + ss::future replicated_partition::validate_fetch_offset( model::offset fetch_offset, bool reading_from_follower, @@ -322,17 +364,6 @@ ss::future replicated_partition::validate_fetch_offset( * receive data. */ - // Calculate log end in kafka offset domain - std::optional log_end; - if (_partition->dirty_offset() >= _partition->start_offset()) { - // Translate the raft dirty offset to find the kafka dirty offset. This - // is conditional on dirty offset being ahead of start offset, because - // if it isn't, then the log is empty and we do not need to check for - // the case of a fetch between hwm and dirty offset. (Issue #7758) - log_end = model::next_offset( - _translator->from_log_offset(_partition->dirty_offset())); - } - // offset validation logic on follower if (reading_from_follower && !_partition->is_leader()) { if (fetch_offset < start_offset()) { @@ -348,42 +379,20 @@ ss::future replicated_partition::validate_fetch_offset( } co_return error_code::none; } - while (log_end.has_value() && fetch_offset > high_watermark() - && fetch_offset <= log_end) { - if (model::timeout_clock::now() > deadline) { - break; - } - // retry linearizable barrier to make sure node is still a leader - auto ec = co_await linearizable_barrier(); - if (ec) { - /** - * when partition is shutting down we may not be able to correctly - * validate consumer requested offset. Return - * not_leader_for_partition error to force client to retry instead - * of out of range error that is forcing consumers to reset their - * offset. - */ - if ( - ec == raft::errc::shutting_down - || ec == cluster::errc::shutting_down) { - co_return error_code::not_leader_for_partition; - } - vlog( - klog.warn, - "error updating partition high watermark with linearizable " - "barrier - {}", - ec.message()); - break; - } + + // Grab the up to date start offset + auto timeout = deadline - model::timeout_clock::now(); + auto start_offset = co_await sync_effective_start(timeout); + if (!start_offset) { vlog( - klog.debug, - "updated partition highwatermark with linearizable barrier. " - "start offset: {}, hight watermark: {}", - _partition->start_offset(), - _partition->high_watermark()); + klog.warn, + "error obtaining latest start offset - {}", + start_offset.error()); + co_return start_offset.error(); } - co_return fetch_offset >= start_offset() && fetch_offset <= log_end_offset() + co_return fetch_offset >= start_offset.value() + && fetch_offset <= log_end_offset() ? error_code::none : error_code::offset_out_of_range; } diff --git a/src/v/kafka/server/replicated_partition.h b/src/v/kafka/server/replicated_partition.h index 8d720191f39e..b0552d4cf823 100644 --- a/src/v/kafka/server/replicated_partition.h +++ b/src/v/kafka/server/replicated_partition.h @@ -37,6 +37,46 @@ class replicated_partition final : public kafka::partition_proxy::impl { const model::ntp& ntp() const final { return _partition->ntp(); } + ss::future> + sync_effective_start(model::timeout_clock::duration timeout) final { + if ( + _partition->is_read_replica_mode_enabled() + && _partition->cloud_data_available()) { + // Always assume remote read in this case. + co_return _partition->start_cloud_offset(); + } + + auto synced_local_start_offset + = co_await _partition->sync_effective_start(timeout); + if (!synced_local_start_offset) { + auto err = synced_local_start_offset.error(); + auto error_code = error_code::unknown_server_error; + if (err.category() == cluster::error_category()) { + switch (cluster::errc(err.value())) { + case cluster::errc::shutting_down: + case cluster::errc::not_leader: + error_code = error_code::not_leader_for_partition; + break; + case cluster::errc::timeout: + error_code = error_code::request_timed_out; + break; + default: + error_code = error_code::unknown_server_error; + } + } + co_return error_code; + } + auto local_kafka_start_offset = _translator->from_log_offset( + synced_local_start_offset.value()); + if ( + _partition->is_remote_fetch_enabled() + && _partition->cloud_data_available() + && (_partition->start_cloud_offset() < local_kafka_start_offset)) { + co_return _partition->start_cloud_offset(); + } + co_return local_kafka_start_offset; + } + model::offset start_offset() const final { if ( _partition->is_read_replica_mode_enabled() @@ -112,6 +152,9 @@ class replicated_partition final : public kafka::partition_proxy::impl { bool is_leader() const final { return _partition->is_leader(); } + ss::future + prefix_truncate(model::offset, ss::lowres_clock::time_point) final; + ss::future linearizable_barrier() final { auto r = co_await _partition->linearizable_barrier(); if (r) { diff --git a/src/v/model/model.cc b/src/v/model/model.cc index eae0534a08cd..195a62ab8fed 100644 --- a/src/v/model/model.cc +++ b/src/v/model/model.cc @@ -357,6 +357,8 @@ std::ostream& operator<<(std::ostream& o, record_batch_type bt) { return o << "batch_type::version_fence"; case record_batch_type::tx_tm_hosted_trasactions: return o << "batch_type::tx_tm_hosted_trasactions"; + case record_batch_type::prefix_truncate: + return o << "batch_type::prefix_truncate"; } return o << "batch_type::unknown{" << static_cast(bt) << "}"; diff --git a/src/v/model/record_batch_types.h b/src/v/model/record_batch_types.h index 3444ed6b65d5..4a9f253d725a 100644 --- a/src/v/model/record_batch_types.h +++ b/src/v/model/record_batch_types.h @@ -42,7 +42,8 @@ enum class record_batch_type : int8_t { cluster_bootstrap_cmd = 22, // cluster bootsrap command version_fence = 23, // version fence/epoch tx_tm_hosted_trasactions = 24, // tx_tm_hosted_trasactions_batch_type - MAX = tx_tm_hosted_trasactions + prefix_truncate = 25, // log prefix truncation type + MAX = prefix_truncate }; std::ostream& operator<<(std::ostream& o, record_batch_type bt); diff --git a/src/v/raft/CMakeLists.txt b/src/v/raft/CMakeLists.txt index 838d6562420c..1d114f9f043f 100644 --- a/src/v/raft/CMakeLists.txt +++ b/src/v/raft/CMakeLists.txt @@ -27,7 +27,6 @@ v_cc_library( offset_monitor.cc event_manager.cc state_machine.cc - log_eviction_stm.cc configuration_manager.cc group_configuration.cc append_entries_buffer.cc diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 5c7f35898463..380f77878fb5 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -80,7 +80,8 @@ offset_translator_batch_types(const model::ntp& ntp) { return { model::record_batch_type::raft_configuration, model::record_batch_type::archival_metadata, - model::record_batch_type::version_fence}; + model::record_batch_type::version_fence, + model::record_batch_type::prefix_truncate}; } else { return {}; } diff --git a/src/v/raft/log_eviction_stm.cc b/src/v/raft/log_eviction_stm.cc deleted file mode 100644 index 16b5ff468cf6..000000000000 --- a/src/v/raft/log_eviction_stm.cc +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright 2020 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -#include "raft/log_eviction_stm.h" - -#include "raft/consensus.h" -#include "raft/types.h" - -#include - -namespace raft { - -log_eviction_stm::log_eviction_stm( - consensus* raft, - ss::logger& logger, - ss::lw_shared_ptr stm_manager, - ss::abort_source& as) - : _raft(raft) - , _logger(logger) - , _stm_manager(std::move(stm_manager)) - , _as(as) {} - -ss::future<> log_eviction_stm::start() { - monitor_log_eviction(); - return ss::now(); -} - -ss::future<> log_eviction_stm::stop() { return _gate.close(); } - -void log_eviction_stm::monitor_log_eviction() { - ssx::spawn_with_gate(_gate, [this] { - return ss::do_until( - [this] { return _gate.is_closed(); }, - [this] { - return _raft->monitor_log_eviction(_as) - .then([this](model::offset last_evicted) { - return handle_deletion_notification(last_evicted); - }) - .handle_exception_type( - [](const ss::abort_requested_exception&) { - // ignore abort requested exception, shutting down - }) - .handle_exception_type([](const ss::gate_closed_exception&) { - // ignore gate closed exception, shutting down - }) - .handle_exception([this](std::exception_ptr e) { - vlog( - _logger.trace, - "Error handling log eviction - {}, ntp: {}", - e, - _raft->ntp()); - }); - }); - }); -} - -ss::future<> -log_eviction_stm::handle_deletion_notification(model::offset last_evicted) { - vlog( - _logger.trace, - "Handling log deletion notification for offset: {}, ntp: {}", - last_evicted, - _raft->ntp()); - - // Store where the local storage layer has requested eviction up to, - // irrespective of whether we can actually evict up to this point yet. - _requested_eviction_offset = last_evicted; - - model::offset max_collectible_offset - = _stm_manager->max_collectible_offset(); - if (last_evicted > max_collectible_offset) { - vlog( - _logger.trace, - "Can only evict up to offset: {}, ntp: {}", - max_collectible_offset, - _raft->ntp()); - last_evicted = max_collectible_offset; - } - - // do nothing, we already taken the snapshot - if (last_evicted <= _previous_eviction_offset) { - return ss::now(); - } - - // persist empty snapshot, we can have no timeout in here as we are passing - // in an abort source - _previous_eviction_offset = last_evicted; - - return _raft->visible_offset_monitor() - .wait(last_evicted, model::no_timeout, _as) - .then([this, last_evicted]() mutable { - auto f = ss::now(); - - if (_stm_manager) { - f = _raft->refresh_commit_index().then([this, last_evicted] { - return _stm_manager->ensure_snapshot_exists(last_evicted); - }); - } - - return f.then([this, last_evicted]() { - return _raft->write_snapshot( - write_snapshot_cfg(last_evicted, iobuf())); - }); - }); -} -} // namespace raft diff --git a/src/v/raft/log_eviction_stm.h b/src/v/raft/log_eviction_stm.h deleted file mode 100644 index babfe6a4ee23..000000000000 --- a/src/v/raft/log_eviction_stm.h +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2020 Redpanda Data, Inc. - * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -#pragma once -#include "config/configuration.h" -#include "model/fundamental.h" -#include "seastarx.h" -#include "storage/types.h" - -#include -#include -#include - -namespace raft { - -class consensus; - -/** - * Responsible for taking snapshots triggered by underlying log segments - * eviction. - * - * The process goes like this: storage layer will send a "deletion notification" - * - a request to evict log up to a certain offset. log_eviction_stm will then - * adjust that offset with _stm_manager->max_collectible_offset(), write the - * raft snapshot and notify the storage layer that log eviction can safely - * proceed up to the adjusted offset. - */ -class log_eviction_stm { -public: - log_eviction_stm( - consensus*, - ss::logger&, - ss::lw_shared_ptr, - ss::abort_source&); - - ss::future<> start(); - - ss::future<> stop(); - - /// Return the offset up to which the storage layer would like to - /// prefix truncate the log, if any. - std::optional eviction_requested_offset() const { - if (_requested_eviction_offset == model::offset{}) { - return std::nullopt; - } else { - return _requested_eviction_offset; - } - } - -private: - ss::future<> handle_deletion_notification(model::offset); - void monitor_log_eviction(); - - consensus* _raft; - ss::logger& _logger; - ss::lw_shared_ptr _stm_manager; - ss::abort_source& _as; - ss::gate _gate; - model::offset _previous_eviction_offset; - model::offset _requested_eviction_offset; -}; - -} // namespace raft diff --git a/src/v/raft/tests/CMakeLists.txt b/src/v/raft/tests/CMakeLists.txt index 284689a680b7..6996f28f5eb5 100644 --- a/src/v/raft/tests/CMakeLists.txt +++ b/src/v/raft/tests/CMakeLists.txt @@ -32,7 +32,6 @@ set(srcs offset_monitor_test.cc mux_state_machine_test.cc mutex_buffer_test.cc - manual_log_deletion_test.cc state_removal_test.cc configuration_manager_test.cc coordinated_recovery_throttle_test.cc diff --git a/src/v/raft/tests/raft_group_fixture.h b/src/v/raft/tests/raft_group_fixture.h index 42f950e3c51a..339e978ebf68 100644 --- a/src/v/raft/tests/raft_group_fixture.h +++ b/src/v/raft/tests/raft_group_fixture.h @@ -22,7 +22,6 @@ #include "raft/consensus.h" #include "raft/consensus_client_protocol.h" #include "raft/heartbeat_manager.h" -#include "raft/log_eviction_stm.h" #include "raft/rpc_client_protocol.h" #include "raft/service.h" #include "random/generators.h" @@ -221,14 +220,6 @@ struct raft_node { hbeats->register_group(consensus).get(); started = true; consensus->start().get0(); - if (log->config().is_collectable()) { - _nop_stm = std::make_unique( - consensus.get(), - tstlog, - ss::make_lw_shared(), - _as); - _nop_stm->start().get0(); - } } ss::future<> stop_node() { @@ -261,8 +252,9 @@ struct raft_node { return consensus->stop(); }) .then([this] { - if (_nop_stm != nullptr) { - return _nop_stm->stop(); + if (kill_eviction_stm_cb) { + return (*kill_eviction_stm_cb)().then( + [this] { kill_eviction_stm_cb = nullptr; }); } return ss::now(); }) @@ -356,11 +348,12 @@ struct raft_node { ss::sharded cache; ss::sharded server; ss::sharded raft_manager; + std::unique_ptr()>> + kill_eviction_stm_cb; leader_clb_t leader_callback; raft::recovery_memory_quota recovery_mem_quota; std::unique_ptr hbeats; consensus_ptr consensus; - std::unique_ptr _nop_stm; ss::sharded feature_table; ss::abort_source _as; }; @@ -919,6 +912,8 @@ struct raft_test_fixture { }).get(); } + virtual ~raft_test_fixture(){}; + consensus_ptr get_leader_raft(raft_group& gr) { auto leader_id = gr.get_leader_id(); if (!leader_id) { diff --git a/src/v/raft/tests/state_removal_test.cc b/src/v/raft/tests/state_removal_test.cc index 5871c8b26097..9790da01bbc2 100644 --- a/src/v/raft/tests/state_removal_test.cc +++ b/src/v/raft/tests/state_removal_test.cc @@ -64,9 +64,6 @@ void stop_node(raft_node& node) { node.recovery_throttle.stop().get(); node.server.stop().get0(); node._as.request_abort(); - if (node._nop_stm != nullptr) { - node._nop_stm->stop().get0(); - } node.raft_manager.stop().get0(); node.consensus = nullptr; node.hbeats->stop().get0(); diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 7ceb558c5a9a..e67545314c78 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -51,7 +51,7 @@ using namespace std::literals::chrono_literals; -namespace { +namespace storage { /* * Some logs must be exempt from the cleanup=delete policy such that their full * history is retained. This function explicitly protects against any accidental @@ -68,9 +68,6 @@ bool deletion_exempt(const model::ntp& ntp) { && ntp.tp.topic == model::tx_manager_topic; return !is_tx_manager_ntp && is_internal_namespace; } -} // namespace - -namespace storage { disk_log_impl::disk_log_impl( ntp_config cfg, @@ -1318,6 +1315,26 @@ disk_log_impl::get_term_last_offset(model::term_id term) const { return std::nullopt; } +std::optional +disk_log_impl::index_lower_bound(model::offset o) const { + if (unlikely(_segs.empty())) { + return std::nullopt; + } + auto it = _segs.lower_bound(o); + if (it == _segs.end()) { + return std::nullopt; + } + auto& idx = (*it)->index(); + if (idx.max_offset() == o) { + // input already lies on a boundary + return o; + } + if (auto entry = idx.find_nearest(o)) { + return model::prev_offset(entry->offset); + } + return std::nullopt; +} + ss::future> disk_log_impl::timequery(timequery_config cfg) { vassert(!_closed, "timequery on closed log - {}", *this); diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 8174a65e909c..36dc8eb200e2 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -90,6 +90,7 @@ class disk_log_impl final : public log::impl { std::optional get_term(model::offset) const final; std::optional get_term_last_offset(model::term_id term) const final; + std::optional index_lower_bound(model::offset o) const final; std::ostream& print(std::ostream&) const final; ss::future<> maybe_roll( diff --git a/src/v/storage/log.h b/src/v/storage/log.h index 1236b937a1c9..9cf2f4cab648 100644 --- a/src/v/storage/log.h +++ b/src/v/storage/log.h @@ -81,6 +81,8 @@ class log final { virtual std::optional get_term(model::offset) const = 0; virtual std::optional get_term_last_offset(model::term_id) const = 0; + virtual std::optional + index_lower_bound(model::offset o) const = 0; virtual ss::future monitor_eviction(ss::abort_source&) = 0; @@ -170,6 +172,10 @@ class log final { return _impl->get_term_last_offset(term); } + std::optional index_lower_bound(model::offset o) const { + return _impl->index_lower_bound(o); + } + ss::future> timequery(timequery_config cfg) { return _impl->timequery(cfg); @@ -232,4 +238,6 @@ log make_disk_backed_log( kvstore&, ss::sharded& feature_table); +bool deletion_exempt(const model::ntp& ntp); + } // namespace storage diff --git a/tests/rptest/clients/kcl.py b/tests/rptest/clients/kcl.py index f3df36174cda..ea09d15786e1 100644 --- a/tests/rptest/clients/kcl.py +++ b/tests/rptest/clients/kcl.py @@ -15,6 +15,7 @@ import itertools from typing import Optional from ducktape.utils.util import wait_until +from rptest.utils.functional import flat_map KclPartitionOffset = namedtuple( 'KclPartitionOffset', @@ -35,6 +36,10 @@ 'KclListPartitionReassignmentsResponse', ['topic', 'partition', 'replicas', 'adding_replicas', 'removing_replicas']) +KclDeleteRecordsResponse = namedtuple( + 'KclDeleteRecordsResponse', + ['broker', 'topic', 'partition', 'new_low_watermark', 'error']) + class KCL: def __init__(self, redpanda): @@ -217,6 +222,51 @@ def offset_delete(self, group: str, topic_partitions: dict): cmd = ['group', 'offset-delete', "-j", group] + request_args_w_flags return json.loads(self._cmd(cmd, attempts=5)) + def delete_records(self, + topic_partitions_offsets: dict[str, dict[int, int]], + timeout_ms=1000): + """ + topic_partitions_offsets: mapping of topic -> mapping of + partition_id to desired truncation offset + + kcl admin delete-records foo:p0,o120 foo:p1,o3888 ... + """ + def unfold(x): + topic, partitions_offsets = x + return [ + f"{topic}:p{partition},o{offset}" + for partition, offset in partitions_offsets.items() + ] + + if len(topic_partitions_offsets) == 0: + return [] + + cmd = ['-X', f'timeout_ms={timeout_ms}', 'admin', 'delete-records' + ] + flat_map(unfold, topic_partitions_offsets.items()) + # Theres no json output support for this command + results = self._cmd(cmd, attempts=5).splitlines()[1:] + results = [x for x in results if len(x) > 0] + + def parse_line(line): + regex = re.compile(r"(\d+)\s+(.*)\s+(\d+)\s+(-?\d+)\s+(.*)") + error_regex = re.compile(r"(-?\d+)\s+(.*)") + matches = regex.match(line) + if matches is None: + err_matches = error_regex.match(line) + if err_matches is None: + raise RuntimeError(f"unexpected kcl output: {line}") + return KclDeleteRecordsResponse(None, None, None, + int(err_matches.group(1)), + err_matches.group(2).strip()) + + return KclDeleteRecordsResponse(int(matches.group(1)), + matches.group(2).strip(), + int(matches.group(3)), + int(matches.group(4)), + matches.group(5).strip()) + + return [parse_line(x.strip()) for x in results] + def get_user_credentials_cmd(self, user_cred: Optional[dict[str, str]] = None): if user_cred is not None: diff --git a/tests/rptest/tests/delete_records_test.py b/tests/rptest/tests/delete_records_test.py new file mode 100644 index 000000000000..97cdc21baf6d --- /dev/null +++ b/tests/rptest/tests/delete_records_test.py @@ -0,0 +1,558 @@ +# Copyright 2023 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import time +import random +import signal +import string +import threading +import confluent_kafka as ck +import ducktape + +from ducktape.mark import parametrize +from rptest.services.cluster import cluster +from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer +from rptest.tests.redpanda_test import RedpandaTest +from rptest.clients.kcl import KCL +from rptest.clients.kafka_cli_tools import KafkaCliTools +from rptest.clients.rpk import RpkTool +from ducktape.utils.util import wait_until +from rptest.clients.types import TopicSpec +from rptest.util import produce_until_segments, segments_count, wait_for_removal_of_n_segments, search_logs_with_timeout +from rptest.services.storage import Segment + + +class DeleteRecordsTest(RedpandaTest): + """ + The purpose of this test is to exercise the delete-records API which + prefix truncates the log at a user defined offset. + """ + topics = (TopicSpec(), ) + + def __init__(self, test_context): + extra_rp_conf = dict( + enable_leader_balancer=False, + log_compaction_interval_ms=5000, + log_segment_size=1048576, + ) + super(DeleteRecordsTest, self).__init__(test_context=test_context, + num_brokers=3, + extra_rp_conf=extra_rp_conf) + self._ctx = test_context + + self.kcl = KCL(self.redpanda) + self.rpk = RpkTool(self.redpanda) + + def get_topic_info(self): + topics_info = list(self.rpk.describe_topic(self.topic)) + self.logger.info(topics_info) + assert len(topics_info) == 1 + return topics_info[0] + + def wait_until_records(self, offset, timeout_sec=30, backoff_sec=1): + def expect_high_watermark(): + topic_info = self.get_topic_info() + return topic_info.high_watermark == offset + + wait_until(expect_high_watermark, + timeout_sec=timeout_sec, + backoff_sec=backoff_sec) + + def delete_records(self, topic, partition, truncate_offset): + """ + Makes delete records call with 1 topic partition in the request body. + + Asserts that the response contains 1 element, with matching topic & + partition contents. + """ + response = self.kcl.delete_records( + {topic: { + partition: truncate_offset + }}) + assert len(response) == 1 + assert response[0].topic == topic + assert response[0].partition == partition + assert response[0].error == 'OK', f"Err msg: {response[0].error}" + return response[0].new_low_watermark + + def assert_start_partition_boundaries(self, truncate_offset): + def check_bound_start(offset): + try: + r = self.rpk.consume(self.topic, + n=1, + offset=f'{offset}-{offset+1}', + quiet=True, + timeout=10) + return r.count('_') == 1 + except Exception as _: + return False + + assert check_bound_start( + truncate_offset + ), f"new log start: {truncate_offset} not consumable" + assert not check_bound_start( + truncate_offset - + 1), f"before log start: {truncate_offset - 1} is consumable" + + def assert_new_partition_boundaries(self, truncate_offset, high_watermark): + """ + Returns true if the partition contains records at the expected boundaries, + ensuring the truncation worked at the exact requested point and that the + number of remaining records is as expected. + """ + def check_bound_end(offset): + try: + # Not timing out means data was available to read + _ = self.rpk.consume(self.topic, + n=1, + offset=offset, + timeout=10) + except Exception as _: + return False + return True + + assert truncate_offset <= high_watermark, f"Test malformed" + + if truncate_offset == high_watermark: + # Assert no data at all can be read + assert not check_bound_end(truncate_offset) + return + + # truncate_offset is inclusive start of log + # high_watermark is exclusive end of log + # Readable offsets: [truncate_offset, high_watermark) + self.assert_start_partition_boundaries(truncate_offset) + assert check_bound_end( + high_watermark - + 1), f"log end: {high_watermark - 1} not consumable" + assert not check_bound_end( + high_watermark), f"high watermark: {high_watermark} is consumable" + + @cluster(num_nodes=3) + def test_delete_records_topic_start_delta(self): + """ + Test that delete-records moves forward the segment offset delta + + Perform this by creating only 1 segment and moving forward the start + offset within that segment, performing verifications at each step + """ + num_records = 10240 + records_size = 512 + truncate_offset_start = 100 + + # Produce some data, wait for it all to arrive + kafka_tools = KafkaCliTools(self.redpanda) + kafka_tools.produce(self.topic, num_records, records_size) + self.wait_until_records(num_records, timeout_sec=10, backoff_sec=1) + + # Call delete-records in a loop incrementing new point each time + for truncate_offset in range(truncate_offset_start, + truncate_offset_start + 5): + # Perform truncation + low_watermark = self.delete_records(self.topic, 0, truncate_offset) + assert low_watermark == truncate_offset, f"Expected low watermark: {truncate_offset} observed: {low_watermark}" + + # Assert correctness of start and end offsets in topic metadata + topic_info = self.get_topic_info() + assert topic_info.id == 0, f"Partition id: {topic_info.id}" + assert topic_info.start_offset == truncate_offset, f"Start offset: {topic_info.start_offset}" + assert topic_info.high_watermark == num_records, f"High watermark: {topic_info.high_watermark}" + + # ... and in actual fetch requests + self.assert_new_partition_boundaries(truncate_offset, + topic_info.high_watermark) + + @cluster(num_nodes=3) + @parametrize(truncate_point="at_segment_boundary") + @parametrize(truncate_point="within_segment") + @parametrize(truncate_point="one_below_high_watermark") + @parametrize(truncate_point="at_high_watermark") + def test_delete_records_segment_deletion(self, truncate_point: str): + """ + Test that prefix truncation actually deletes data + + In the case the desired truncation point passes multiple segments it + can be asserted that all of those segments will be deleted once the + log_eviction_stm processes the deletion event + """ + segments_to_write = 10 + + # Produce 10 segments, then issue a truncation, assert corect number + # of segments are deleted + produce_until_segments( + self.redpanda, + topic=self.topic, + partition_idx=0, + count=segments_to_write, + acks=-1, + ) + + # Grab a snapshot of the segments to determine the final segment + # indicies of all segments. This is to test corner cases where the + # user happens to pass a segment index as a truncation index. + snapshot = self.redpanda.storage().segments_by_node( + "kafka", self.topic, 0) + assert len(snapshot) > 0, "empty snapshot" + self.redpanda.logger.info(f"Snapshot: {snapshot}") + + def get_segment_boundaries(node): + def to_final_indicies(seg): + return int(seg.data_file.split('-')[0]) - 1 + + indicies = [to_final_indicies(seg) for seg in node] + return sorted([idx for idx in indicies if idx != -1]) + + # Tests for 3 different types of scenarios + # 1. User wants to truncate all data - high_watermark + # 2. User wants to truncate at a segment boundary - at_segment_boundary + # 3. User wants to trunate between a boundary - within_segment + def obtain_test_parameters(snapshot): + node = snapshot[list(snapshot.keys())[-1]] + segment_boundaries = get_segment_boundaries(node) + self.redpanda.logger.info( + f"Segment boundaries: {segment_boundaries}") + truncate_offset = None + expected_segments_removed = None + high_watermark = int(self.get_topic_info().high_watermark) + if truncate_point == "one_below_high_watermark": + truncate_offset = high_watermark - 1 # Leave 1 record + expected_segments_removed = len(segment_boundaries) + elif truncate_point == "at_high_watermark": + truncate_offset = high_watermark # Delete all data + expected_segments_removed = len(segment_boundaries) + elif truncate_point == "within_segment": + random_seg = random.randint(0, len(segment_boundaries) - 2) + truncate_offset = random.randint( + segment_boundaries[random_seg] + 1, + segment_boundaries[random_seg + 1] - 1) + expected_segments_removed = random_seg + elif truncate_point == "at_segment_boundary": + random_seg = random.randint(0, len(segment_boundaries) - 1) + truncate_offset = segment_boundaries[random_seg] + expected_segments_removed = random_seg + else: + assert False, "unknown test parameter encountered" + + self.redpanda.logger.info( + f"Truncate offset: {truncate_offset} expected segments to remove: {expected_segments_removed}" + ) + return (truncate_offset, expected_segments_removed, high_watermark) + + (truncate_offset, expected_segments_removed, + high_watermark) = obtain_test_parameters(snapshot) + + # Make delete-records call, assert response looks ok + try: + low_watermark = self.delete_records(self.topic, 0, truncate_offset) + assert low_watermark == truncate_offset, f"Expected low watermark: {truncate_offset} observed: {low_watermark}" + except Exception as e: + topic_info = self.get_topic_info() + self.redpanda.logger.info( + f"Start offset: {topic_info.start_offset}") + raise e + + # Restart one node while the effect is propogating + node = random.choice(self.redpanda.nodes) + self.redpanda.signal_redpanda(node, signal=signal.SIGKILL) + self.redpanda.start_node(node) + + # Assert start offset is correct and there aren't any off-by-one errors + self.assert_new_partition_boundaries(low_watermark, high_watermark) + + try: + # All segments except for the current should be removed + self.redpanda.logger.info( + f"Waiting until {expected_segments_removed} segs are deleted") + wait_for_removal_of_n_segments(redpanda=self.redpanda, + topic=self.topic, + partition_idx=0, + n=expected_segments_removed, + original_snapshot=snapshot) + except ducktape.errors.TimeoutError as e: + # If there are some segments remaining, this is due to the fact that truncation + # occured in the middle of the prefix_truncate call, so tombstones were written, + # but not persisted to disk, on restart these segments are orphaned. This is not + # a bug since on subsequent truncate calls, these segments will be removed. + self.redpanda.logger.info( + "Timed out waiting for segments, looking to see if leftover segments are segments that are to be recovered" + ) + + def failed_nodes_num_segments(node): + failed_node_storage = self.redpanda.node_storage(node) + segments = failed_node_storage.segments("kafka", self.topic, 0) + self.redpanda.logger.info(f"Failed node segments: {segments}") + return len(segments) + + expected_to_remain = len(snapshot) - expected_segments_removed + segments = failed_nodes_num_segments(node) + assert segments >= expected_to_remain + + # Delete the topic so post storage checks don't fail on inconsistencies across nodes + self.rpk.delete_topic(self.topic) + wait_until(lambda: failed_nodes_num_segments(node) == 0, + timeout_sec=10, + backoff_sec=2) + + @cluster(num_nodes=3) + def test_delete_records_bounds_checking(self): + """ + Ensure that the delete-records handler returns the appropriate error + code when a user attempts to truncate at an offset that is not within + the boundaries of the partition. + """ + num_records = 10240 + records_size = 512 + out_of_range_prefix = "OFFSET_OUT_OF_RANGE" + + # Produce some data, wait for it all to arrive + kafka_tools = KafkaCliTools(self.redpanda) + kafka_tools.produce(self.topic, num_records, records_size) + self.wait_until_records(num_records, timeout_sec=10, backoff_sec=1) + + def check_bad_response(response): + assert len(response) == 1 + assert response[0].topic == self.topic + assert response[0].partition == 0 + assert response[0].new_low_watermark == -1 + return response[0].error + + def bad_truncation(truncate_offset): + response = self.kcl.delete_records( + {self.topic: { + 0: truncate_offset + }}) + assert check_bad_response(response).startswith( + out_of_range_prefix + ), f"Unexpected error msg: {response[0].error}" + + # Try truncating past the end of the log + bad_truncation(num_records + 1) + + # Truncate to attempt to truncate before new beginning + truncate_offset = 125 + low_watermark = self.delete_records(self.topic, 0, truncate_offset) + assert low_watermark == truncate_offset + + # Try to truncate before and at the low_watermark + bad_truncation(0) + bad_truncation(low_watermark) + + # Try to truncate at a specific edge case where the start and end + # are 1 offset away from eachother + truncate_offset = num_records - 2 + for t_ofs in range(truncate_offset, num_records + 1): + low_watermark = self.delete_records(self.topic, 0, t_ofs) + assert low_watermark == t_ofs + + # Assert that nothing is readable + bad_truncation(truncate_offset) + bad_truncation(num_records) + bad_truncation(num_records + 1) + + @cluster(num_nodes=3) + def test_delete_records_empty_or_missing_topic_or_partition(self): + missing_topic = "foo_bar" + out_of_range_prefix = "OFFSET_OUT_OF_RANGE" + unknown_topic_or_partition = "UNKNOWN_TOPIC_OR_PARTITION" + missing_partition = "partition was not returned" + + # Assert failure condition on unknown topic + response = self.kcl.delete_records({missing_topic: {0: 0}}) + assert len(response) == 1 + assert response[0].new_low_watermark == -1 + assert response[0].error.find(unknown_topic_or_partition) != -1 + + # Assert failure condition on unknown partition index + missing_idx = 15 + response = self.kcl.delete_records({self.topic: {missing_idx: 0}}) + assert len(response) == 1 + assert response[0].new_low_watermark == -1 + assert response[0].error.find(missing_partition) != -1 + + # Assert out of range occurs on an empty topic + attempts = 5 + response = [] + while attempts > 0: + response = self.kcl.delete_records({self.topic: {0: 0}}) + assert len(response) == 1 + assert response[0].topic == self.topic + assert response[0].partition == 0 + assert response[0].new_low_watermark == -1 + if not response[0].error.startswith("NOT_LEADER"): + break + time.sleep(1) + attempts -= 1 + assert response[0].error.startswith(out_of_range_prefix) + + # Assert correct behavior on a topic with 1 record + kafka_tools = KafkaCliTools(self.redpanda) + kafka_tools.produce(self.topic, 1, 512) + self.wait_until_records(1, timeout_sec=5, backoff_sec=1) + response = self.kcl.delete_records({self.topic: {0: 0}}) + assert len(response) == 1 + assert response[0].new_low_watermark == -1 + assert response[0].error.startswith(out_of_range_prefix) + + # ... truncating at high watermark 1 should delete all data + low_watermark = self.delete_records(self.topic, 0, 1) + assert low_watermark == 1 + topic_info = self.get_topic_info() + assert topic_info.high_watermark == 1 + + @cluster(num_nodes=3) + def test_delete_records_with_transactions(self): + """ + Tests that the log_eviction_stm is respecting the max_collectible_offset + """ + payload = ''.join( + random.choice(string.ascii_letters) for _ in range(512)) + producer = ck.Producer({ + 'bootstrap.servers': self.redpanda.brokers(), + 'transactional.id': '0', + }) + producer.init_transactions() + + def delete_records_within_transaction(reporter): + try: + high_watermark = int(self.get_topic_info().high_watermark) + response = self.kcl.delete_records( + {self.topic: { + 0: high_watermark + }}, timeout_ms=5000) + assert len(response) == 1 + # Even though the on disk data may be late to evict, the start offset + # should have been immediately updated + assert response[0].new_low_watermark == high_watermark + assert response[0].error == 'OK' + except Exception as e: + reporter.exc = e + + # Write 20 records and leave the transaction open + producer.begin_transaction() + for idx in range(0, 20): + producer.produce(self.topic, '0', payload, 0) + producer.flush() + + # The eviction_stm will be re-queuing the event until the max collectible + # offset is moved ahead of the eviction offset, or until a timeout occurs + class ThreadReporter: + exc = None + + eviction_thread = threading.Thread( + target=delete_records_within_transaction, args=(ThreadReporter, )) + eviction_thread.start() + + # Committing the transaction will allow the eviction_stm to move forward + # and process the event. + time.sleep(1) + producer.commit_transaction() + eviction_thread.join() + # Rethrow exception encountered in thread if observed + if ThreadReporter.exc is not None: + raise ThreadReporter.exc + + @cluster(num_nodes=5) + def test_delete_records_concurrent_truncations(self): + """ + This tests verifies that issuing DeleteRecords requests with concurrent + producers and consumers has no effect on correctness + """ + + # Should complete producing within 20s + producer = KgoVerifierProducer(self._ctx, + self.redpanda, + self.topic, + msg_size=512, + msg_count=20000, + rate_limit_bps=500000) # 0.5/mbs + consumer = KgoVerifierConsumerGroupConsumer(self._ctx, + self.redpanda, + self.topic, + 512, + 1, + max_msgs=20000) + + def periodic_delete_records(): + topic_info = self.get_topic_info() + start_offset = topic_info.start_offset + 1 + high_watermark = topic_info.high_watermark - 1 + if high_watermark - start_offset <= 0: + self.redpanda.logger.info("Waiting on log to populate") + return + truncate_point = random.randint(start_offset, high_watermark) + self.redpanda.logger.info( + f"Issuing delete_records request at offset: {truncate_point}") + response = self.kcl.delete_records( + {self.topic: { + 0: truncate_point + }}, timeout_ms=5000) + assert len(response) == 1 + assert response[0].new_low_watermark == truncate_point + assert response[0].error == 'OK' + # Cannot assert end boundaries as there is a concurrent producer + # moving the hwm forward + self.assert_start_partition_boundaries(truncate_point) + + def periodic_delete_records_loop(reporter, + iterations, + sleep_sec, + allowable_retrys=3): + """ + Periodicially issue delete records requests within a loop. allowable_reties + exists so that the test doesn't automatically fail when a leadership change occurs. + The implementation guaratntees that the special prefix_truncation batch is + replicated before the client is acked, it however does not guarantee that the effect + has occured. Have the test retry on some failures to account for this. + """ + try: + try: + while iterations > 0: + periodic_delete_records() + iterations -= 1 + time.sleep(sleep_sec) + except AssertionError as e: + if allowable_retrys == 0: + raise e + allowable_retrys = allowable_retrys - 1 + except Exception as e: + reporter.exc = e + + class ThreadReporter: + exc = None + + n_attempts = 10 + thread_sleep = 1 # 10s of uptime, less if exception thrown + delete_records_thread = threading.Thread( + target=periodic_delete_records_loop, + args=( + ThreadReporter, + n_attempts, + thread_sleep, + )) + + # Start up producer/consumer and thread that periodically issues delete records requests + delete_records_thread.start() + producer.start() + consumer.start() + + # Shut down all threads started above + self.redpanda.logger.info("Joining on delete-records thread") + delete_records_thread.join() + self.redpanda.logger.info("Joining on producer thread") + producer.wait() + self.redpanda.logger.info("Calling consumer::stop") + consumer.stop() + self.redpanda.logger.info("Joining on consumer thread") + consumer.wait() + if ThreadReporter.exc is not None: + raise ThreadReporter.exc + + status = consumer.consumer_status + assert status.validator.invalid_reads == 0 + assert status.validator.out_of_scope_invalid_reads == 0