Skip to content

Commit

Permalink
Merge pull request #12470 from mmaslankaprv/log-eviction-stm
Browse files Browse the repository at this point in the history
Changed implementation of log eviction stm to use queue
  • Loading branch information
piyushredpanda authored Jul 27, 2023
2 parents 114da28 + 64c31ab commit 777b8e0
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 105 deletions.
225 changes: 127 additions & 98 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "utils/gate_guard.h"

#include <seastar/core/future-util.hh>
#include <seastar/core/sleep.hh>

namespace cluster {

Expand All @@ -40,82 +41,130 @@ log_eviction_stm::log_eviction_stm(
ss::abort_source& as,
storage::kvstore& kvstore)
: persisted_stm("log_eviction_stm.snapshot", logger, raft, kvstore)
, _logger(logger)
, _as(as) {}
, _as(as)
, _queue(max_event_queue_size) {}

ss::future<> log_eviction_stm::start() {
ssx::spawn_with_gate(_gate, [this] { return monitor_log_eviction(); });
ssx::spawn_with_gate(
_gate, [this] { return write_raft_snapshots_in_background(); });
_gate, [this] { return handle_log_eviction_events(); });
return persisted_stm::start();
}

ss::future<> log_eviction_stm::stop() {
_reap_condition.broken();
_queue_mutex.broken();
_queue.abort(std::make_exception_ptr(ss::abort_requested_exception()));
co_await persisted_stm::stop();
}

ss::future<> log_eviction_stm::write_raft_snapshots_in_background() {
ss::future<> log_eviction_stm::enqueue_eviction_event(
model::offset offset, retry_event_until_success wait) {
vlog(
_log.trace,
"log eviction event at offset: {}, waiting for truncation: {}",
offset,
wait);

auto u = co_await _queue_mutex.get_units();
co_await _queue.push_eventually(eviction_event{
.prefix_truncate_offset = offset, .wait_for_success = wait});
}

ss::future<> log_eviction_stm::handle_log_eviction_events() {
static constexpr auto retry_backoff_time = 5s;
/// This method is executed as a background fiber and it attempts to write
/// snapshots as close to effective_start_offset as possible.
auto gh = _gate.hold();
bool wait_with_timeout = false;

while (!_as.abort_requested()) {
if (_raft->stopped()) {
_queue_mutex.broken();
_queue.abort(
std::make_exception_ptr(ss::abort_requested_exception()));
break;
}
/// This background fiber can be woken-up via apply() when special
/// batches are processed or by the storage layer when local
/// eviction is triggered.
try {
static const auto eviction_event_wait_time = 5s;
if (wait_with_timeout) {
/// If after call to \ref do_write_snapshot() has not
/// truncated up until the desired point, it will be OK to
/// iterate again after some time to retry. Maybe
/// max_collectible_offset has moved forward.
co_await _reap_condition.wait(eviction_event_wait_time);
} else {
/// Last truncation had completed with success, there is no need
/// to wait for any other condition then for notify() to be
/// called.
co_await _reap_condition.wait();
}
} catch (const ss::condition_variable_timed_out&) {
/// There is still more data to truncate
co_await _queue.not_empty();
} catch (const ss::abort_requested_exception&) {
break;
} catch (const ss::broken_condition_variable&) {
/// stop() has been called
break;
} catch (const ss::broken_semaphore&) {
/// stop() has been called
break;
}

auto event = _queue.front();
vlog(
_log.trace,
"processing prefix truncation event with offset: {}, waiting for "
"truncate: {}",
event.prefix_truncate_offset,
event.wait_for_success);

auto evict_until = std::max(
_delete_records_eviction_offset, _storage_eviction_offset);
auto index_lb = _raft->log().index_lower_bound(evict_until);
if (!index_lb) {
wait_with_timeout = false;
_delete_records_eviction_offset, event.prefix_truncate_offset);
auto truncation_point = _raft->log().index_lower_bound(evict_until);
if (!truncation_point) {
vlog(
_log.warn,
"unable to find index lower bound for {}",
evict_until);
if (
!event.wait_for_success
|| _raft->last_snapshot_index() >= evict_until) {
maybe_pop_queue();
} else {
co_await ss::sleep_abortable(retry_backoff_time, _as);
}
continue;
}

vassert(
index_lb <= evict_until,
truncation_point <= evict_until,
"Calculated boundary {} must be <= effective_start {} ",
index_lb,
truncation_point,
evict_until);
try {
co_await do_write_raft_snapshot(*index_lb);
co_await do_write_raft_snapshot(*truncation_point);
if (
!event.wait_for_success
|| _raft->last_snapshot_index() >= truncation_point) {
maybe_pop_queue();
} else {
vlog(
_log.debug,
"haven't yet been able to truncate at {} offset, waiting for "
"next retry",
event.prefix_truncate_offset);
co_await ss::sleep_abortable(retry_backoff_time, _as);
}
} catch (const ss::abort_requested_exception&) {
// ignore abort requested exception, shutting down
} catch (const ss::gate_closed_exception&) {
// ignore gate closed exception, shutting down
} catch (const ss::broken_semaphore&) {
} catch (const ss::sleep_aborted&) {
// ignore broken sem exception, shutting down
} catch (const std::exception& e) {
vlog(
_logger.error,
"Error occurred when attempting to write snapshot: "
"{}, ntp: {}",
e,
_raft->ntp());
_log.error,
"Error occurred when attempting to write snapshot: {}",
e);
}
wait_with_timeout = _raft->last_snapshot_index() < index_lb;
}
}

void log_eviction_stm::maybe_pop_queue() {
// Since the queue is emptied when stopped we must check if it is empty
if (!_queue.empty()) {
_queue.pop();
}
}
ss::future<> log_eviction_stm::monitor_log_eviction() {
/// This method is executed as a background fiber and is listening for
/// eviction events from the storage layer. These events will trigger a
Expand All @@ -125,59 +174,52 @@ ss::future<> log_eviction_stm::monitor_log_eviction() {
try {
_storage_eviction_offset = co_await _raft->monitor_log_eviction(
_as);
const auto max_collectible_offset
= _raft->log().stm_manager()->max_collectible_offset();
const auto next_eviction_offset = std::min(
max_collectible_offset, _storage_eviction_offset);
_reap_condition.signal();
/// Do not attempt to process another eviction event from storage
/// until the current has completed fully
co_await _last_snapshot_monitor.wait(
next_eviction_offset, model::no_timeout, _as);
co_await enqueue_eviction_event(
_storage_eviction_offset, retry_event_until_success::no);
} catch (const ss::abort_requested_exception&) {
// ignore abort requested exception, shutting down
} catch (const ss::gate_closed_exception&) {
// ignore gate closed exception, shutting down
} catch (const std::exception& e) {
vlog(
_logger.info,
"Error handling log eviction - {}, ntp: {}",
e,
_raft->ntp());
vlog(_log.info, "Error handling log eviction - {}", e);
}
}
}

ss::future<> log_eviction_stm::do_write_raft_snapshot(model::offset index_lb) {
if (index_lb <= _raft->last_snapshot_index()) {
ss::future<>
log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) {
vlog(
_log.trace,
"requested to write raft snapshot (prefix_truncate) at {}",
truncation_point);
if (truncation_point <= _raft->last_snapshot_index()) {
co_return;
}
co_await _raft->visible_offset_monitor().wait(
index_lb, model::no_timeout, _as);
truncation_point, model::no_timeout, _as);
co_await _raft->refresh_commit_index();
co_await _raft->log().stm_manager()->ensure_snapshot_exists(index_lb);
co_await _raft->log().stm_manager()->ensure_snapshot_exists(
truncation_point);
const auto max_collectible_offset
= _raft->log().stm_manager()->max_collectible_offset();
if (index_lb > max_collectible_offset) {
index_lb = max_collectible_offset;
if (index_lb <= _raft->last_snapshot_index()) {
if (truncation_point > max_collectible_offset) {
truncation_point = max_collectible_offset;
if (truncation_point <= _raft->last_snapshot_index()) {
/// Cannot truncate, have already reached maximum allowable
co_return;
}
vlog(
_logger.trace,
"Can only evict up to offset: {}, asked to evict to: {} ntp: {}",
_log.trace,
"Can only evict up to offset: {}, asked to evict to: {} ",
max_collectible_offset,
index_lb,
_raft->ntp());
truncation_point);
}
vlog(
_logger.debug,
"Truncating data up until offset: {} for ntp: {}",
index_lb,
_raft->ntp());
co_await _raft->write_snapshot(raft::write_snapshot_cfg(index_lb, iobuf()));
_last_snapshot_monitor.notify(index_lb);
_log.debug,
"Requesting raft snapshot with final offset: {}",
truncation_point);
co_await _raft->write_snapshot(
raft::write_snapshot_cfg(truncation_point, iobuf()));
}

ss::future<result<model::offset, std::error_code>>
Expand Down Expand Up @@ -234,21 +276,19 @@ ss::future<log_eviction_stm::offset_result> log_eviction_stm::truncate(
/// After command replication all that can be guaranteed is that the command
/// was replicated
vlog(
_logger.info,
_log.info,
"Replicating prefix_truncate command, redpanda start offset: {}, kafka "
"start offset: {} "
"current last snapshot offset: {}, current last visible offset: {} for "
"ntp: {}",
"current last snapshot offset: {}, current last visible offset: {}",
val.rp_start_offset,
val.kafka_start_offset,
_raft->last_snapshot_index(),
_raft->last_visible_index(),
_raft->ntp());
_raft->last_visible_index());

auto res = co_await replicate_command(std::move(batch), deadline, as);
if (res.has_failure()) {
vlog(
_logger.info,
_log.info,
"Failed to observe replicated command in log, reason: {}",
res.error().message());
co_return res.as_failure();
Expand Down Expand Up @@ -281,7 +321,7 @@ ss::future<log_eviction_stm::offset_result> log_eviction_stm::replicate_command(

if (!result) {
vlog(
_logger.info,
_log.info,
"Failed to replicate prefix_truncate command, reason: {}",
result.error());
co_return result.error();
Expand Down Expand Up @@ -318,9 +358,8 @@ ss::future<> log_eviction_stm::apply(model::record_batch batch) {
batch.copy_records().begin()->release_key());
if (batch_type != prefix_truncate_key) {
vlog(
_logger.error,
"Unknown prefix_truncate batch type for {} at offset {}: {}",
_raft->ntp(),
_log.error,
"Unknown prefix_truncate batch type at offset {}: {}",
batch.header().base_offset(),
batch_type);
co_return;
Expand All @@ -332,26 +371,25 @@ ss::future<> log_eviction_stm::apply(model::record_batch batch) {
// time of replicating. We still need to have replicated it though so
// other STMs can honor it (e.g. archival).
vlog(
_logger.info,
"Replicated prefix_truncate batch for {} with no local redpanda "
_log.info,
"Replicated prefix_truncate batch with no local redpanda "
"offset. Requested start Kafka offset {}",
_raft->ntp(),
record.kafka_start_offset);
co_return;
}
auto truncate_offset = record.rp_start_offset - model::offset(1);
if (truncate_offset > _delete_records_eviction_offset) {
vlog(
_logger.debug,
"Moving local to truncate_point: {} last_applied: {} ntp: {}",
_log.info,
"Applying prefix truncate batch with truncate offset: {} "
"last_applied: {}",
truncate_offset,
last_applied_offset(),
_raft->ntp());
last_applied_offset());

/// Set the new in memory start offset
_delete_records_eviction_offset = truncate_offset;
/// Wake up the background reaping thread
_reap_condition.signal();
co_await enqueue_eviction_event(
truncate_offset, retry_event_until_success::yes);
/// Writing a local snapshot is just an optimization, delete-records
/// is infrequently called and theres no better time to persist the
/// fact that a new start offset has been written to disk
Expand Down Expand Up @@ -380,21 +418,16 @@ ss::future<> log_eviction_stm::handle_raft_snapshot() {
_storage_eviction_offset = last_snapshot_index;
set_next(model::next_offset(last_snapshot_index));
vlog(
_logger.info,
"Handled log eviction new effective start offset: {} for ntp: {}",
effective_start_offset(),
_c->ntp());
_log.info,
"Handled log eviction new effective start offset: {}",
effective_start_offset());
}

ss::future<>
log_eviction_stm::apply_snapshot(stm_snapshot_header header, iobuf&& data) {
auto snapshot = serde::from_iobuf<snapshot_data>(std::move(data));
vlog(
_logger.info,
"Applying snapshot {} at offset: {} for ntp: {}",
snapshot,
header.offset,
_raft->ntp());
_log.info, "Applying snapshot {} at offset: {}", snapshot, header.offset);

_delete_records_eviction_offset = snapshot.effective_start_offset;
_last_snapshot_offset = header.offset;
Expand All @@ -403,11 +436,7 @@ log_eviction_stm::apply_snapshot(stm_snapshot_header header, iobuf&& data) {
}

ss::future<stm_snapshot> log_eviction_stm::take_snapshot() {
vlog(
_logger.trace,
"Taking snapshot at offset: {} for ntp: {}",
last_applied_offset(),
_raft->ntp());
vlog(_log.trace, "Taking snapshot at offset: {}", last_applied_offset());
iobuf snap_data = serde::to_iobuf(
snapshot_data{.effective_start_offset = _delete_records_eviction_offset});
co_return stm_snapshot::create(
Expand Down
Loading

0 comments on commit 777b8e0

Please sign in to comment.