Skip to content

Commit

Permalink
Merge pull request #9933 from andrwng/v22.3.x-evict-from-partition
Browse files Browse the repository at this point in the history
[v22.3.x] cloud_storage: move eviction under remote_partition
  • Loading branch information
andrwng authored Apr 27, 2023
2 parents a51ec01 + c09f0c7 commit 2099e16
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 123 deletions.
76 changes: 14 additions & 62 deletions src/v/cloud_storage/materialized_segments.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,11 @@ materialized_segments::materialized_segments()
ss::future<> materialized_segments::stop() {
cst_log.debug("Stopping materialized_segments...");
_stm_timer.cancel();
_cvar.broken();

co_await _gate.close();

// Do the last pass over the eviction list to stop remaining items returned
// from readers after the eviction loop stopped.
for (auto& rs : _eviction_pending) {
co_await std::visit(
[](auto&& rs) {
if (!rs->is_stopped()) {
return rs->stop();
} else {
return ss::make_ready_future<>();
}
},
rs);
}
cst_log.debug("Stopped materialized_segments...");
}
ss::future<> materialized_segments::start() {
// Fiber that consumes from _eviction_list and calls stop
// on items before destroying them
ssx::spawn_with_gate(_gate, [this] { return run_eviction_loop(); });

// Timer to invoke TTL eviction of segments
_stm_timer.set_callback([this] {
trim_segments(std::nullopt);
Expand Down Expand Up @@ -114,47 +95,6 @@ size_t materialized_segments::current_segments() const {
return _segment_units.outstanding();
}

void materialized_segments::evict_reader(
std::unique_ptr<remote_segment_batch_reader> reader) {
_eviction_pending.push_back(std::move(reader));
_cvar.signal();
}
void materialized_segments::evict_segment(
ss::lw_shared_ptr<remote_segment> segment) {
_eviction_pending.push_back(std::move(segment));
_cvar.signal();
}

ss::future<> materialized_segments::flush_evicted() {
if (_eviction_pending.empty() && _eviction_in_flight.empty()) {
// Fast path, avoid waking up the eviction loop if there is no work.
co_return;
}

auto barrier = ss::make_lw_shared<eviction_barrier>();

// Write a barrier to the list and wait for the eviction consumer
// to reach it: this
_eviction_pending.push_back(barrier);
_cvar.signal();

co_await barrier->promise.get_future();
}

ss::future<> materialized_segments::run_eviction_loop() {
// Evict readers asynchronously
while (true) {
co_await _cvar.wait([this] { return !_eviction_pending.empty(); });
_eviction_in_flight = std::exchange(_eviction_pending, {});
co_await ss::max_concurrent_for_each(
_eviction_in_flight, 1024, [](auto&& rs_variant) {
return std::visit(
[](auto&& rs) { return rs->stop(); }, rs_variant);
});
_eviction_in_flight.clear();
}
}

void materialized_segments::register_segment(materialized_segment_state& s) {
_materialized.push_back(s);
}
Expand Down Expand Up @@ -238,7 +178,13 @@ void materialized_segments::trim_readers(size_t target_free) {
// Readers hold a reference to the segment, so for the
// segment.owned() check to pass, we need to clear them out.
while (!st.readers.empty() && _reader_units.current() < target_free) {
evict_reader(std::move(st.readers.front()));
auto partition = st.parent;
// TODO: consider asserting here instead: it's a bug for
// 'partition' to be null, since readers outlive the partition, and
// we can't create new readers if the partition has been shut down.
if (likely(partition)) {
partition->evict_reader(std::move(st.readers.front()));
}
st.readers.pop_front();
}
}
Expand Down Expand Up @@ -352,7 +298,13 @@ void materialized_segments::maybe_trim_segment(
// Readers hold a reference to the segment, so for the
// segment.owned() check to pass, we need to clear them out.
while (!st.readers.empty()) {
evict_reader(std::move(st.readers.front()));
// TODO: consider asserting here instead: it's a bug for
// 'partition' to be null, since readers outlive the partition, and
// we can't create new readers if the partition has been shut down.
auto partition = st.parent;
if (likely(partition)) {
partition->evict_reader(std::move(st.readers.front()));
}
st.readers.pop_front();
}
}
Expand Down
38 changes: 0 additions & 38 deletions src/v/cloud_storage/materialized_segments.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,10 @@ class materialized_segments {

void register_segment(materialized_segment_state& s);

/// Put reader into the eviction list which will
/// eventually lead to it being closed and deallocated
void evict_reader(std::unique_ptr<remote_segment_batch_reader> reader);
void evict_segment(ss::lw_shared_ptr<remote_segment> segment);

ssx::semaphore_units get_reader_units();

ssx::semaphore_units get_segment_units();

/// Wait until any evicted items in the _eviction_list have been removed.
ss::future<> flush_evicted();

private:
/// Timer use to periodically evict stale readers
ss::timer<ss::lowres_clock> _stm_timer;
Expand All @@ -82,33 +74,6 @@ class materialized_segments {
/// How many materialized_segment_state instances exist
size_t current_segments() const;

/// Special item in eviction_list that holds a promise and sets it
/// when the eviction fiber calls stop() (see flush_evicted)
struct eviction_barrier {
ss::promise<> promise;

ss::future<> stop() {
promise.set_value();
stopped = true;
return ss::now();
}

bool stopped{false};

bool is_stopped() const { return stopped; }
};

using evicted_resource_t = std::variant<
std::unique_ptr<remote_segment_batch_reader>,
ss::lw_shared_ptr<remote_segment>,
ss::lw_shared_ptr<eviction_barrier>>;
using eviction_list_t = std::deque<evicted_resource_t>;

/// List of segments and readers waiting to have their stop() method
/// called before destruction
eviction_list_t _eviction_pending;
eviction_list_t _eviction_in_flight;

// We need to quickly look up readers by segment, to find any readers
// for a segment that is targeted by a read. Within those readers,
// we may do a linear scan to find if any of those readers matches
Expand All @@ -118,9 +83,6 @@ class materialized_segments {
&materialized_segment_state::_hook>
_materialized;

/// Kick this condition variable when appending to eviction_list
ss::condition_variable _cvar;

/// Gate for background eviction
ss::gate _gate;

Expand Down
81 changes: 68 additions & 13 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "utils/retry_chain_node.h"
#include "utils/stream_utils.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/loop.hh>
Expand All @@ -48,6 +49,7 @@ using storage_t = model::record_batch_reader::storage_t;

remote_partition::iterator
remote_partition::materialize_segment(const segment_meta& meta) {
_as.check();
auto base_kafka_offset = meta.base_offset - meta.delta_offset;
auto units = materialized().get_segment_units();
auto st = std::make_unique<materialized_segment_state>(
Expand Down Expand Up @@ -340,6 +342,10 @@ class partition_record_batch_reader_impl final
vlog(
_ctxlog.debug,
"gate_closed_exception while reading from remote_partition");
} catch (const ss::abort_requested_exception&) {
vlog(
_ctxlog.debug,
"abort_requested_exception while reading from remote_partition");
} catch (const std::exception& e) {
vlog(
_ctxlog.warn,
Expand Down Expand Up @@ -463,7 +469,7 @@ class partition_record_batch_reader_impl final
_reader->max_rp_offset(),
_reader->is_eof(),
_next_segment_base_offset);
_partition->materialized().evict_reader(std::move(_reader));
_partition->evict_reader(std::move(_reader));
vlog(
_ctxlog.debug,
"initializing new segment reader {}, next offset",
Expand Down Expand Up @@ -521,15 +527,47 @@ class partition_record_batch_reader_impl final

remote_partition::remote_partition(
const partition_manifest& m, remote& api, cache& c, s3::bucket_name bucket)
: _rtc()
: _rtc(_as)
, _ctxlog(cst_log, _rtc, m.get_ntp().path())
, _api(api)
, _cache(c)
, _manifest(m)
, _bucket(std::move(bucket))
, _probe(m.get_ntp()) {}

ss::future<> remote_partition::start() { co_return; }
ss::future<> remote_partition::start() {
// Fiber that consumers from _eviction_list and calls stop on items before
// destroying them.
ssx::spawn_with_gate(_gate, [this] { return run_eviction_loop(); });
co_return;
}

void remote_partition::evict_reader(
std::unique_ptr<remote_segment_batch_reader> reader) {
_eviction_pending.push_back(std::move(reader));
_has_evictions_cvar.signal();
}

void remote_partition::evict_segment(
ss::lw_shared_ptr<remote_segment> segment) {
_eviction_pending.push_back(std::move(segment));
_has_evictions_cvar.signal();
}

ss::future<> remote_partition::run_eviction_loop() {
// Evict readers asynchronously.
// NOTE: exits when the condition variable is broken.
while (true) {
co_await _has_evictions_cvar.wait(
[this] { return !_eviction_pending.empty(); });
auto eviction_in_flight = std::exchange(_eviction_pending, {});
co_await ss::max_concurrent_for_each(
eviction_in_flight, 200, [](auto&& rs_variant) {
return std::visit(
[](auto&& rs) { return rs->stop(); }, rs_variant);
});
}
}

kafka::offset remote_partition::first_uploaded_offset() {
vassert(
Expand Down Expand Up @@ -637,6 +675,12 @@ remote_partition::aborted_transactions(offset_range offsets) {
ss::future<> remote_partition::stop() {
vlog(_ctxlog.debug, "remote partition stop {} segments", _segments.size());

// Prevent further segment materialization, readers, etc.
_as.request_abort();

// Signal to the eviction loop that it should terminate.
_has_evictions_cvar.broken();

co_await _gate.close();
// Remove materialized_segment_state from the list that contains it, to
// avoid it getting registered for eviction and stop.
Expand All @@ -660,13 +704,20 @@ ss::future<> remote_partition::stop() {
return segment->stop();
});

// We may have some segment or reader objects enqueued for stop in
// the shared eviction queue: must flush it, or they can outlive
// us and trigger assertion in retry_chain_node destructor.
// This waits for all evictions, not just ours, but that's okay because
// stopping readers is fast, the queue is not usually long, and destroying
// partitions is relatively infrequent.
co_await materialized().flush_evicted();
// Do the last pass over the eviction list to stop remaining items returned
// from readers after the eviction loop stopped.
for (auto& rs : _eviction_pending) {
co_await std::visit(
[](auto&& rs) {
if (!rs->is_stopped()) {
return rs->stop();
} else {
return ss::make_ready_future<>();
}
},
rs);
}

vlog(_ctxlog.debug, "remote partition stopped");
}

Expand All @@ -682,7 +733,7 @@ void remote_partition::return_reader(
// which 'it' points to belongs to the new segment.
it->second->return_reader(std::move(reader));
} else {
materialized().evict_reader(std::move(reader));
evict_reader(std::move(reader));
}
}

Expand Down Expand Up @@ -819,7 +870,7 @@ ss::future<bool> remote_partition::tolerant_delete_object(
* S3, not the state of the archival metadata stm (which can be out
* of date if e.g. we were not the leader)
*/
ss::future<> remote_partition::erase() {
ss::future<> remote_partition::erase(ss::abort_source& as) {
// TODO: Edge case 1
// There is a rare race in which objects might get left behind in S3.
//
Expand All @@ -843,7 +894,11 @@ ss::future<> remote_partition::erase() {

static constexpr ss::lowres_clock::duration erase_timeout = 60s;
static constexpr ss::lowres_clock::duration erase_backoff = 1s;
retry_chain_node local_rtc(erase_timeout, erase_backoff, &_rtc);

// This function is called after ::stop, so we may not use our
// main retry_chain_node which is bound to our abort source,
// and construct a special one.
retry_chain_node local_rtc(as, erase_timeout, erase_backoff);

// Read the partition manifest fresh: we might already have
// dropped local archival_stm state related to this partition.
Expand Down
37 changes: 36 additions & 1 deletion src/v/cloud_storage/remote_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,15 @@ class remote_partition
retry_chain_node& parent);

/// Remove objects from S3
ss::future<> erase();
ss::future<> erase(ss::abort_source&);

/// Hook for materialized_segment to notify us when a segment is evicted
void offload_segment(model::offset);

// Place on the eviction queue.
void evict_reader(std::unique_ptr<remote_segment_batch_reader> reader);
void evict_segment(ss::lw_shared_ptr<remote_segment> segment);

private:
friend struct materialized_segment_state;

Expand All @@ -143,6 +147,8 @@ class remote_partition
/// This is exposed for the benefit of the materialized_segment_state
materialized_segments& materialized();

ss::future<> run_eviction_loop();

/// Materialize segment if needed and create a reader
///
/// \param config is a reader config
Expand Down Expand Up @@ -186,11 +192,40 @@ class remote_partition
retry_chain_node _rtc;
retry_chain_logger _ctxlog;
ss::gate _gate;
ss::abort_source _as;
remote& _api;
cache& _cache;
const partition_manifest& _manifest;
s3::bucket_name _bucket;

/// Special item in eviction_list that holds a promise and sets it
/// when the eviction fiber calls stop() (see flush_evicted)
struct eviction_barrier {
ss::promise<> promise;

ss::future<> stop() {
promise.set_value();
stopped = true;
return ss::now();
}

bool stopped{false};

bool is_stopped() const { return stopped; }
};

using evicted_resource_t = std::variant<
std::unique_ptr<remote_segment_batch_reader>,
ss::lw_shared_ptr<remote_segment>,
ss::lw_shared_ptr<eviction_barrier>>;
using eviction_list_t = std::deque<evicted_resource_t>;

/// Kick this condition variable when appending to eviction_list
ss::condition_variable _has_evictions_cvar;

/// List of segments and readers waiting to have their stop() method
/// called before destruction
eviction_list_t _eviction_pending;
segment_map_t _segments;
partition_probe _probe;
};
Expand Down
Loading

0 comments on commit 2099e16

Please sign in to comment.