Skip to content

Commit

Permalink
Merge pull request #19161 from ballard26/fetch-opt
Browse files Browse the repository at this point in the history
Optimize `replicated_partition::validate_fetch_offset`
  • Loading branch information
piyushredpanda authored Jul 15, 2024
2 parents fd5f4ca + e96451a commit a4e54a3
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 102 deletions.
53 changes: 43 additions & 10 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,22 +217,54 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) {
raft::write_snapshot_cfg(truncation_point, std::move(snapshot_data)));
}

ss::future<result<model::offset, std::error_code>>
log_eviction_stm::sync_start_offset_override(
kafka::offset log_eviction_stm::kafka_start_offset_override() {
if (_cached_kafka_start_offset_override != kafka::offset{}) {
return _cached_kafka_start_offset_override;
}

// Since the STM doesn't snapshot `_cached_kafka_start_override` its
// possible for it to be lost during restarts. Therefore the raft offset
// which is snapshotted will be translated if possible.
if (_delete_records_eviction_offset == model::offset{}) {
return kafka::offset{};
}

auto raft_start_offset_override = model::next_offset(
_delete_records_eviction_offset);

// This handles an edge case where the stm will not record any raft
// offsets that do not land in local storage. Hence returning
// `kafka::offset{}` indicates to the caller that the archival stm
// should be queried for the offset instead.
if (raft_start_offset_override <= _raft->start_offset()) {
return kafka::offset{};
}

_cached_kafka_start_offset_override = model::offset_cast(
_raft->log()->from_log_offset(raft_start_offset_override));

return _cached_kafka_start_offset_override;
}

ss::future<result<kafka::offset, std::error_code>>
log_eviction_stm::sync_kafka_start_offset_override(
model::timeout_clock::duration timeout) {
/// Call this method to ensure followers have processed up until the
/// most recent known version of the special batch. This is particularly
/// useful to know if the start offset is up to date in the case
/// leadership has recently changed for example.
auto term = _raft->term();
if (!co_await sync(timeout)) {
if (term != _raft->term()) {
co_return errc::not_leader;
} else {
co_return errc::timeout;
}
}
co_return start_offset_override();
return sync(timeout).then(
[this, term](bool success) -> result<kafka::offset, std::error_code> {
if (!success) {
if (term != _raft->term()) {
return errc::not_leader;
} else {
return errc::timeout;
}
}
return kafka_start_offset_override();
});
}

model::offset log_eviction_stm::effective_start_offset() const {
Expand Down Expand Up @@ -369,6 +401,7 @@ ss::future<> log_eviction_stm::apply(const model::record_batch& batch) {
}
const auto record = serde::from_iobuf<prefix_truncate_record>(
batch.copy_records().begin()->release_value());
_cached_kafka_start_offset_override = record.kafka_start_offset;
if (record.rp_start_offset == model::offset{}) {
// This may happen if the requested offset was not in the local log at
// time of replicating. We still need to have replicated it though so
Expand Down
21 changes: 13 additions & 8 deletions src/v/cluster/log_eviction_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,17 @@ class log_eviction_stm
/// This only returns the start override, if one exists. It does not take
/// into account local storage, and may not even point to an offset that
/// exists in local storage (e.g. if we have locally truncated).
ss::future<offset_result>
sync_start_offset_override(model::timeout_clock::duration timeout);
///
/// If `kafka::offset{}` is returned and archival storage is enabled for the
/// given ntp then the caller should fall back on the archival stm to check
/// if a start offset override exists and if so what its value is.
ss::future<result<kafka::offset, std::error_code>>
sync_kafka_start_offset_override(model::timeout_clock::duration timeout);

model::offset start_offset_override() const {
if (_delete_records_eviction_offset == model::offset{}) {
return model::offset{};
}
return model::next_offset(_delete_records_eviction_offset);
}
/// If `kafka::offset{}` is returned and archival storage is enabled for the
/// given ntp then the caller should fall back on the archival stm to check
/// if a start offset override exists and if so what its value is.
kafka::offset kafka_start_offset_override();

ss::future<iobuf> take_snapshot(model::offset) final { co_return iobuf{}; }

Expand Down Expand Up @@ -144,6 +146,9 @@ class log_eviction_stm

// Should be signaled every time either of the above offsets are updated.
ss::condition_variable _has_pending_truncation;

// Kafka offset of the last `prefix_truncate_record` applied to this stm.
kafka::offset _cached_kafka_start_offset_override;
};

class log_eviction_stm_factory : public state_machine_factory {
Expand Down
77 changes: 51 additions & 26 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1294,25 +1294,54 @@ partition::get_cloud_storage_manifest_view() {
ss::future<result<model::offset, std::error_code>>
partition::sync_kafka_start_offset_override(
model::timeout_clock::duration timeout) {
if (_log_eviction_stm && !is_read_replica_mode_enabled()) {
auto offset_res
= co_await _log_eviction_stm->sync_start_offset_override(timeout);
if (is_read_replica_mode_enabled()) {
auto term = _raft->term();
if (!co_await _archival_meta_stm->sync(timeout)) {
if (term != _raft->term()) {
co_return errc::not_leader;
} else {
co_return errc::timeout;
}
}
auto start_kafka_offset
= _archival_meta_stm->manifest().get_start_kafka_offset_override();

co_return kafka::offset_cast(start_kafka_offset);
}

if (_log_eviction_stm) {
auto offset_res = co_await _log_eviction_stm
->sync_kafka_start_offset_override(timeout);
if (offset_res.has_failure()) {
co_return offset_res.as_failure();
}
// The eviction STM only keeps track of DeleteRecords truncations
// as Raft offsets. Translate if possible.
if (
offset_res.value() != model::offset{}
&& _raft->start_offset() < offset_res.value()) {
auto start_kafka_offset = log()->from_log_offset(
offset_res.value());
co_return start_kafka_offset;
if (offset_res.value() != kafka::offset{}) {
co_return kafka::offset_cast(offset_res.value());
}
// If a start override is no longer in the offset translator state,
// it may have been uploaded and persisted in the manifest.
}
if (_archival_meta_stm) {

if (!_archival_meta_stm) {
co_return model::offset{};
}

// There are a few cases in which the log_eviction_stm will return a kafka
// offset of `kafka::offset{}` for the start offset override.
// - The topic was remotely recovered.
// - A start offset override was never set.
// - The broker has restarted and the log_eviction_stm couldn't recover the
// kafka offset for the start offset override.
//
// In all cases we'll need to fall back to the archival stm to figure out if
// a start offset override exists, and if so, what it is.
//
// For this we'll sync the archival stm a single time to ensure we have the
// most up-to-date manifest. From that point onwards the offset
// `_archival_meta_stm->manifest().get_start_kafka_offset_override()` will
// be correct without having to sync again. This is since the offset will
// not change until another offset override has been applied to the log
// eviction stm. And at that point the log eviction stm will be able to give
// us the correct offset override.
if (!_has_synced_archival_for_start_override) [[unlikely]] {
auto term = _raft->term();
if (!co_await _archival_meta_stm->sync(timeout)) {
if (term != _raft->term()) {
Expand All @@ -1321,13 +1350,12 @@ partition::sync_kafka_start_offset_override(
co_return errc::timeout;
}
}
auto start_kafka_offset
= _archival_meta_stm->manifest().get_start_kafka_offset_override();
if (start_kafka_offset != kafka::offset{}) {
co_return kafka::offset_cast(start_kafka_offset);
}
_has_synced_archival_for_start_override = true;
}
co_return model::offset{};

auto start_kafka_offset
= _archival_meta_stm->manifest().get_start_kafka_offset_override();
co_return kafka::offset_cast(start_kafka_offset);
}

model::offset partition::last_stable_offset() const {
Expand Down Expand Up @@ -1385,13 +1413,10 @@ partition::archival_meta_stm() const {

std::optional<model::offset> partition::kafka_start_offset_override() const {
if (_log_eviction_stm && !is_read_replica_mode_enabled()) {
auto o = _log_eviction_stm->start_offset_override();
if (o != model::offset{} && _raft->start_offset() < o) {
auto start_kafka_offset = log()->from_log_offset(o);
return start_kafka_offset;
auto o = _log_eviction_stm->kafka_start_offset_override();
if (o != kafka::offset{}) {
return kafka::offset_cast(o);
}
// If a start override is no longer in the offset translator state,
// it may have been uploaded and persisted in the manifest.
}
if (_archival_meta_stm) {
auto o
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
ss::sharded<archival::upload_housekeeping_service>& _upload_housekeeping;
config::binding<model::cleanup_policy_bitflags> _log_cleanup_policy;

// Used in `sync_kafka_start_offset_override` to avoid having to re-sync the
// `archival_meta_stm`.
bool _has_synced_archival_for_start_override{false};

friend std::ostream& operator<<(std::ostream& o, const partition& x);
};
} // namespace cluster
Expand Down
102 changes: 54 additions & 48 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,32 @@ const model::ntp& replicated_partition::ntp() const {
ss::future<result<model::offset, error_code>>
replicated_partition::sync_effective_start(
model::timeout_clock::duration timeout) {
auto synced_start_offset_override
= co_await _partition->sync_kafka_start_offset_override(timeout);
if (synced_start_offset_override.has_failure()) {
auto err = synced_start_offset_override.error();
auto error_code = error_code::unknown_server_error;
if (err.category() == cluster::error_category()) {
switch (cluster::errc(err.value())) {
/**
* In the case of timeout and shutting down errors return
* not_leader_for_partition error to force clients retry.
*/
case cluster::errc::shutting_down:
case cluster::errc::not_leader:
case cluster::errc::timeout:
error_code = error_code::not_leader_for_partition;
break;
default:
error_code = error_code::unknown_server_error;
}
}
co_return error_code;
}
co_return kafka_start_offset_with_override(
synced_start_offset_override.value());
return _partition->sync_kafka_start_offset_override(timeout).then(
[this](auto synced_start_offset_override)
-> result<model::offset, error_code> {
if (synced_start_offset_override.has_failure()) {
auto err = synced_start_offset_override.error();
auto error_code = error_code::unknown_server_error;
if (err.category() == cluster::error_category()) {
switch (cluster::errc(err.value())) {
/**
* In the case of timeout and shutting down errors return
* not_leader_for_partition error to force clients retry.
*/
case cluster::errc::shutting_down:
case cluster::errc::not_leader:
case cluster::errc::timeout:
error_code = error_code::not_leader_for_partition;
break;
default:
error_code = error_code::unknown_server_error;
}
}
return error_code;
}
return kafka_start_offset_with_override(
synced_start_offset_override.value());
});
}

model::offset replicated_partition::start_offset() const {
Expand Down Expand Up @@ -575,36 +577,40 @@ ss::future<error_code> replicated_partition::validate_fetch_offset(
ec);
}

co_return ec;
return ss::make_ready_future<error_code>(ec);
}

// Grab the up to date start offset
auto timeout = deadline - model::timeout_clock::now();
auto start_offset = co_await sync_effective_start(timeout);
if (!start_offset) {
vlog(
klog.warn,
"ntp {}: error obtaining latest start offset - {}",
ntp(),
start_offset.error());
co_return start_offset.error();
}
return sync_effective_start(timeout).then(
[this, fetch_offset](auto start_offset) {
if (!start_offset) {
vlog(
klog.warn,
"ntp {}: error obtaining latest start offset - {}",
ntp(),
start_offset.error());
return start_offset.error();
}

if (
fetch_offset < start_offset.value() || fetch_offset > log_end_offset()) {
vlog(
klog.warn,
"ntp {}: fetch offset_out_of_range on leader, requested: {}, "
"partition start offset: {}, high watermark: {}, log end offset: {}",
ntp(),
fetch_offset,
start_offset.value(),
high_watermark(),
log_end_offset());
co_return error_code::offset_out_of_range;
}
if (
fetch_offset < start_offset.value()
|| fetch_offset > log_end_offset()) {
vlog(
klog.warn,
"ntp {}: fetch offset_out_of_range on leader, requested: {}, "
"partition start offset: {}, high watermark: {}, log end "
"offset: {}",
ntp(),
fetch_offset,
start_offset.value(),
high_watermark(),
log_end_offset());
return error_code::offset_out_of_range;
}

co_return error_code::none;
return error_code::none;
});
}

result<partition_info> replicated_partition::get_partition_info() const {
Expand Down
22 changes: 12 additions & 10 deletions src/v/raft/persisted_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,16 +492,16 @@ ss::future<bool>
persisted_stm<T>::sync(model::timeout_clock::duration timeout) {
auto term = _raft->term();
if (!_raft->is_leader()) {
co_return false;
return ss::make_ready_future<bool>(false);
}
if (_insync_term == term) {
co_return true;
return ss::make_ready_future<bool>(true);
}
if (_is_catching_up) {
auto deadline = model::timeout_clock::now() + timeout;
auto sync_waiter = ss::make_lw_shared<expiring_promise<bool>>();
_sync_waiters.push_back(sync_waiter);
co_return co_await sync_waiter->get_future_with_timeout(
return sync_waiter->get_future_with_timeout(
deadline, [] { return false; });
}
_is_catching_up = true;
Expand All @@ -527,14 +527,16 @@ persisted_stm<T>::sync(model::timeout_clock::duration timeout) {
// of the term yet.
sync_offset = log_offsets.dirty_offset;
}
auto is_synced = co_await do_sync(timeout, sync_offset, term);

_is_catching_up = false;
for (auto& sync_waiter : _sync_waiters) {
sync_waiter->set_value(is_synced);
}
_sync_waiters.clear();
co_return is_synced;
return do_sync(timeout, sync_offset, term).then([this](bool is_synced) {
_is_catching_up = false;
for (auto& sync_waiter : _sync_waiters) {
sync_waiter->set_value(is_synced);
}
_sync_waiters.clear();

return is_synced;
});
}

template<supported_stm_snapshot T>
Expand Down

0 comments on commit a4e54a3

Please sign in to comment.