diff --git a/src/v/cluster/log_eviction_stm.cc b/src/v/cluster/log_eviction_stm.cc index 7f08c59552a3..59c5b1257d08 100644 --- a/src/v/cluster/log_eviction_stm.cc +++ b/src/v/cluster/log_eviction_stm.cc @@ -217,22 +217,54 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) { raft::write_snapshot_cfg(truncation_point, std::move(snapshot_data))); } -ss::future> -log_eviction_stm::sync_start_offset_override( +kafka::offset log_eviction_stm::kafka_start_offset_override() { + if (_cached_kafka_start_offset_override != kafka::offset{}) { + return _cached_kafka_start_offset_override; + } + + // Since the STM doesn't snapshot `_cached_kafka_start_override` its + // possible for it to be lost during restarts. Therefore the raft offset + // which is snapshotted will be translated if possible. + if (_delete_records_eviction_offset == model::offset{}) { + return kafka::offset{}; + } + + auto raft_start_offset_override = model::next_offset( + _delete_records_eviction_offset); + + // This handles an edge case where the stm will not record any raft + // offsets that do not land in local storage. Hence returning + // `kafka::offset{}` indicates to the caller that the archival stm + // should be queried for the offset instead. + if (raft_start_offset_override <= _raft->start_offset()) { + return kafka::offset{}; + } + + _cached_kafka_start_offset_override = model::offset_cast( + _raft->log()->from_log_offset(raft_start_offset_override)); + + return _cached_kafka_start_offset_override; +} + +ss::future> +log_eviction_stm::sync_kafka_start_offset_override( model::timeout_clock::duration timeout) { /// Call this method to ensure followers have processed up until the /// most recent known version of the special batch. This is particularly /// useful to know if the start offset is up to date in the case /// leadership has recently changed for example. auto term = _raft->term(); - if (!co_await sync(timeout)) { - if (term != _raft->term()) { - co_return errc::not_leader; - } else { - co_return errc::timeout; - } - } - co_return start_offset_override(); + return sync(timeout).then( + [this, term](bool success) -> result { + if (!success) { + if (term != _raft->term()) { + return errc::not_leader; + } else { + return errc::timeout; + } + } + return kafka_start_offset_override(); + }); } model::offset log_eviction_stm::effective_start_offset() const { @@ -369,6 +401,7 @@ ss::future<> log_eviction_stm::apply(const model::record_batch& batch) { } const auto record = serde::from_iobuf( batch.copy_records().begin()->release_value()); + _cached_kafka_start_offset_override = record.kafka_start_offset; if (record.rp_start_offset == model::offset{}) { // This may happen if the requested offset was not in the local log at // time of replicating. We still need to have replicated it though so diff --git a/src/v/cluster/log_eviction_stm.h b/src/v/cluster/log_eviction_stm.h index a992a0675200..0338bf240baf 100644 --- a/src/v/cluster/log_eviction_stm.h +++ b/src/v/cluster/log_eviction_stm.h @@ -94,15 +94,17 @@ class log_eviction_stm /// This only returns the start override, if one exists. It does not take /// into account local storage, and may not even point to an offset that /// exists in local storage (e.g. if we have locally truncated). - ss::future - sync_start_offset_override(model::timeout_clock::duration timeout); + /// + /// If `kafka::offset{}` is returned and archival storage is enabled for the + /// given ntp then the caller should fall back on the archival stm to check + /// if a start offset override exists and if so what its value is. + ss::future> + sync_kafka_start_offset_override(model::timeout_clock::duration timeout); - model::offset start_offset_override() const { - if (_delete_records_eviction_offset == model::offset{}) { - return model::offset{}; - } - return model::next_offset(_delete_records_eviction_offset); - } + /// If `kafka::offset{}` is returned and archival storage is enabled for the + /// given ntp then the caller should fall back on the archival stm to check + /// if a start offset override exists and if so what its value is. + kafka::offset kafka_start_offset_override(); ss::future take_snapshot(model::offset) final { co_return iobuf{}; } @@ -144,6 +146,9 @@ class log_eviction_stm // Should be signaled every time either of the above offsets are updated. ss::condition_variable _has_pending_truncation; + + // Kafka offset of the last `prefix_truncate_record` applied to this stm. + kafka::offset _cached_kafka_start_offset_override; }; class log_eviction_stm_factory : public state_machine_factory { diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index b26de5635ce7..f8d71a7a11a2 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -1294,25 +1294,54 @@ partition::get_cloud_storage_manifest_view() { ss::future> partition::sync_kafka_start_offset_override( model::timeout_clock::duration timeout) { - if (_log_eviction_stm && !is_read_replica_mode_enabled()) { - auto offset_res - = co_await _log_eviction_stm->sync_start_offset_override(timeout); + if (is_read_replica_mode_enabled()) { + auto term = _raft->term(); + if (!co_await _archival_meta_stm->sync(timeout)) { + if (term != _raft->term()) { + co_return errc::not_leader; + } else { + co_return errc::timeout; + } + } + auto start_kafka_offset + = _archival_meta_stm->manifest().get_start_kafka_offset_override(); + + co_return kafka::offset_cast(start_kafka_offset); + } + + if (_log_eviction_stm) { + auto offset_res = co_await _log_eviction_stm + ->sync_kafka_start_offset_override(timeout); if (offset_res.has_failure()) { co_return offset_res.as_failure(); } - // The eviction STM only keeps track of DeleteRecords truncations - // as Raft offsets. Translate if possible. - if ( - offset_res.value() != model::offset{} - && _raft->start_offset() < offset_res.value()) { - auto start_kafka_offset = log()->from_log_offset( - offset_res.value()); - co_return start_kafka_offset; + if (offset_res.value() != kafka::offset{}) { + co_return kafka::offset_cast(offset_res.value()); } - // If a start override is no longer in the offset translator state, - // it may have been uploaded and persisted in the manifest. } - if (_archival_meta_stm) { + + if (!_archival_meta_stm) { + co_return model::offset{}; + } + + // There are a few cases in which the log_eviction_stm will return a kafka + // offset of `kafka::offset{}` for the start offset override. + // - The topic was remotely recovered. + // - A start offset override was never set. + // - The broker has restarted and the log_eviction_stm couldn't recover the + // kafka offset for the start offset override. + // + // In all cases we'll need to fall back to the archival stm to figure out if + // a start offset override exists, and if so, what it is. + // + // For this we'll sync the archival stm a single time to ensure we have the + // most up-to-date manifest. From that point onwards the offset + // `_archival_meta_stm->manifest().get_start_kafka_offset_override()` will + // be correct without having to sync again. This is since the offset will + // not change until another offset override has been applied to the log + // eviction stm. And at that point the log eviction stm will be able to give + // us the correct offset override. + if (!_has_synced_archival_for_start_override) [[unlikely]] { auto term = _raft->term(); if (!co_await _archival_meta_stm->sync(timeout)) { if (term != _raft->term()) { @@ -1321,13 +1350,12 @@ partition::sync_kafka_start_offset_override( co_return errc::timeout; } } - auto start_kafka_offset - = _archival_meta_stm->manifest().get_start_kafka_offset_override(); - if (start_kafka_offset != kafka::offset{}) { - co_return kafka::offset_cast(start_kafka_offset); - } + _has_synced_archival_for_start_override = true; } - co_return model::offset{}; + + auto start_kafka_offset + = _archival_meta_stm->manifest().get_start_kafka_offset_override(); + co_return kafka::offset_cast(start_kafka_offset); } model::offset partition::last_stable_offset() const { @@ -1385,13 +1413,10 @@ partition::archival_meta_stm() const { std::optional partition::kafka_start_offset_override() const { if (_log_eviction_stm && !is_read_replica_mode_enabled()) { - auto o = _log_eviction_stm->start_offset_override(); - if (o != model::offset{} && _raft->start_offset() < o) { - auto start_kafka_offset = log()->from_log_offset(o); - return start_kafka_offset; + auto o = _log_eviction_stm->kafka_start_offset_override(); + if (o != kafka::offset{}) { + return kafka::offset_cast(o); } - // If a start override is no longer in the offset translator state, - // it may have been uploaded and persisted in the manifest. } if (_archival_meta_stm) { auto o diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index ff959210d0d8..bc61043b56dc 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -384,6 +384,10 @@ class partition : public ss::enable_lw_shared_from_this { ss::sharded& _upload_housekeeping; config::binding _log_cleanup_policy; + // Used in `sync_kafka_start_offset_override` to avoid having to re-sync the + // `archival_meta_stm`. + bool _has_synced_archival_for_start_override{false}; + friend std::ostream& operator<<(std::ostream& o, const partition& x); }; } // namespace cluster diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 77536cba6d30..0840eeb6cf6b 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -45,30 +45,32 @@ const model::ntp& replicated_partition::ntp() const { ss::future> replicated_partition::sync_effective_start( model::timeout_clock::duration timeout) { - auto synced_start_offset_override - = co_await _partition->sync_kafka_start_offset_override(timeout); - if (synced_start_offset_override.has_failure()) { - auto err = synced_start_offset_override.error(); - auto error_code = error_code::unknown_server_error; - if (err.category() == cluster::error_category()) { - switch (cluster::errc(err.value())) { - /** - * In the case of timeout and shutting down errors return - * not_leader_for_partition error to force clients retry. - */ - case cluster::errc::shutting_down: - case cluster::errc::not_leader: - case cluster::errc::timeout: - error_code = error_code::not_leader_for_partition; - break; - default: - error_code = error_code::unknown_server_error; - } - } - co_return error_code; - } - co_return kafka_start_offset_with_override( - synced_start_offset_override.value()); + return _partition->sync_kafka_start_offset_override(timeout).then( + [this](auto synced_start_offset_override) + -> result { + if (synced_start_offset_override.has_failure()) { + auto err = synced_start_offset_override.error(); + auto error_code = error_code::unknown_server_error; + if (err.category() == cluster::error_category()) { + switch (cluster::errc(err.value())) { + /** + * In the case of timeout and shutting down errors return + * not_leader_for_partition error to force clients retry. + */ + case cluster::errc::shutting_down: + case cluster::errc::not_leader: + case cluster::errc::timeout: + error_code = error_code::not_leader_for_partition; + break; + default: + error_code = error_code::unknown_server_error; + } + } + return error_code; + } + return kafka_start_offset_with_override( + synced_start_offset_override.value()); + }); } model::offset replicated_partition::start_offset() const { @@ -575,36 +577,40 @@ ss::future replicated_partition::validate_fetch_offset( ec); } - co_return ec; + return ss::make_ready_future(ec); } // Grab the up to date start offset auto timeout = deadline - model::timeout_clock::now(); - auto start_offset = co_await sync_effective_start(timeout); - if (!start_offset) { - vlog( - klog.warn, - "ntp {}: error obtaining latest start offset - {}", - ntp(), - start_offset.error()); - co_return start_offset.error(); - } + return sync_effective_start(timeout).then( + [this, fetch_offset](auto start_offset) { + if (!start_offset) { + vlog( + klog.warn, + "ntp {}: error obtaining latest start offset - {}", + ntp(), + start_offset.error()); + return start_offset.error(); + } - if ( - fetch_offset < start_offset.value() || fetch_offset > log_end_offset()) { - vlog( - klog.warn, - "ntp {}: fetch offset_out_of_range on leader, requested: {}, " - "partition start offset: {}, high watermark: {}, log end offset: {}", - ntp(), - fetch_offset, - start_offset.value(), - high_watermark(), - log_end_offset()); - co_return error_code::offset_out_of_range; - } + if ( + fetch_offset < start_offset.value() + || fetch_offset > log_end_offset()) { + vlog( + klog.warn, + "ntp {}: fetch offset_out_of_range on leader, requested: {}, " + "partition start offset: {}, high watermark: {}, log end " + "offset: {}", + ntp(), + fetch_offset, + start_offset.value(), + high_watermark(), + log_end_offset()); + return error_code::offset_out_of_range; + } - co_return error_code::none; + return error_code::none; + }); } result replicated_partition::get_partition_info() const { diff --git a/src/v/raft/persisted_stm.cc b/src/v/raft/persisted_stm.cc index f7a62f9ceeea..d15ad1428692 100644 --- a/src/v/raft/persisted_stm.cc +++ b/src/v/raft/persisted_stm.cc @@ -492,16 +492,16 @@ ss::future persisted_stm::sync(model::timeout_clock::duration timeout) { auto term = _raft->term(); if (!_raft->is_leader()) { - co_return false; + return ss::make_ready_future(false); } if (_insync_term == term) { - co_return true; + return ss::make_ready_future(true); } if (_is_catching_up) { auto deadline = model::timeout_clock::now() + timeout; auto sync_waiter = ss::make_lw_shared>(); _sync_waiters.push_back(sync_waiter); - co_return co_await sync_waiter->get_future_with_timeout( + return sync_waiter->get_future_with_timeout( deadline, [] { return false; }); } _is_catching_up = true; @@ -527,14 +527,16 @@ persisted_stm::sync(model::timeout_clock::duration timeout) { // of the term yet. sync_offset = log_offsets.dirty_offset; } - auto is_synced = co_await do_sync(timeout, sync_offset, term); - _is_catching_up = false; - for (auto& sync_waiter : _sync_waiters) { - sync_waiter->set_value(is_synced); - } - _sync_waiters.clear(); - co_return is_synced; + return do_sync(timeout, sync_offset, term).then([this](bool is_synced) { + _is_catching_up = false; + for (auto& sync_waiter : _sync_waiters) { + sync_waiter->set_value(is_synced); + } + _sync_waiters.clear(); + + return is_synced; + }); } template