Skip to content

Commit

Permalink
cloud_storage: move eviction under remote_partition
Browse files Browse the repository at this point in the history
CONFLICT:
- required adding an abort source to remote_partition

Previously each remote_partition would wait for an eviction barrier to
pass through the eviction loop, ensuring all segments are destructed
before stopping the partition. Each segment references members of the
remote_partition, so it's important the shutdown sequence stops the
segments before destructing the remote_partition. At the same time,
having each partition wait for another set of partitions to finish
flushing can result in a slow shutdown.

This commit moves the eviction loop into the remote_partition, allowing
partition shutdown to entirely avoid waiting for any other partition to
shut down, while still ensuring that each underlying segment is
destructed after the remote_partition.

Without this commit, I witnessed the period of partition shutdown in a
heavily loaded server take 30 minutes. With this commit I see a
similarly shaped shutdown taking 10 seconds.

Related #9569

(cherry picked from commit 03587d8)
  • Loading branch information
andrwng committed Apr 10, 2023
1 parent b715116 commit 9de1010
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 113 deletions.
76 changes: 14 additions & 62 deletions src/v/cloud_storage/materialized_segments.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,11 @@ materialized_segments::materialized_segments()
ss::future<> materialized_segments::stop() {
cst_log.debug("Stopping materialized_segments...");
_stm_timer.cancel();
_cvar.broken();

co_await _gate.close();

// Do the last pass over the eviction list to stop remaining items returned
// from readers after the eviction loop stopped.
for (auto& rs : _eviction_pending) {
co_await std::visit(
[](auto&& rs) {
if (!rs->is_stopped()) {
return rs->stop();
} else {
return ss::make_ready_future<>();
}
},
rs);
}
cst_log.debug("Stopped materialized_segments...");
}
ss::future<> materialized_segments::start() {
// Fiber that consumes from _eviction_list and calls stop
// on items before destroying them
ssx::spawn_with_gate(_gate, [this] { return run_eviction_loop(); });

// Timer to invoke TTL eviction of segments
_stm_timer.set_callback([this] {
trim_segments(std::nullopt);
Expand Down Expand Up @@ -112,47 +93,6 @@ size_t materialized_segments::current_segments() const {
return _segment_units.outstanding();
}

void materialized_segments::evict_reader(
std::unique_ptr<remote_segment_batch_reader> reader) {
_eviction_pending.push_back(std::move(reader));
_cvar.signal();
}
void materialized_segments::evict_segment(
ss::lw_shared_ptr<remote_segment> segment) {
_eviction_pending.push_back(std::move(segment));
_cvar.signal();
}

ss::future<> materialized_segments::flush_evicted() {
if (_eviction_pending.empty() && _eviction_in_flight.empty()) {
// Fast path, avoid waking up the eviction loop if there is no work.
co_return;
}

auto barrier = ss::make_lw_shared<eviction_barrier>();

// Write a barrier to the list and wait for the eviction consumer
// to reach it: this
_eviction_pending.push_back(barrier);
_cvar.signal();

co_await barrier->promise.get_future();
}

ss::future<> materialized_segments::run_eviction_loop() {
// Evict readers asynchronously
while (true) {
co_await _cvar.wait([this] { return !_eviction_pending.empty(); });
_eviction_in_flight = std::exchange(_eviction_pending, {});
while (!_eviction_in_flight.empty()) {
co_await std::visit(
[](auto&& rs) { return rs->stop(); },
_eviction_in_flight.front());
_eviction_in_flight.pop_front();
}
}
}

void materialized_segments::register_segment(materialized_segment_state& s) {
_materialized.push_back(s);
}
Expand Down Expand Up @@ -236,7 +176,13 @@ void materialized_segments::trim_readers(size_t target_free) {
// Readers hold a reference to the segment, so for the
// segment.owned() check to pass, we need to clear them out.
while (!st.readers.empty() && _reader_units.current() < target_free) {
evict_reader(std::move(st.readers.front()));
auto partition = st.parent;
// TODO: consider asserting here instead: it's a bug for
// 'partition' to be null, since readers outlive the partition, and
// we can't create new readers if the partition has been shut down.
if (likely(partition)) {
partition->evict_reader(std::move(st.readers.front()));
}
st.readers.pop_front();
}
}
Expand Down Expand Up @@ -350,7 +296,13 @@ void materialized_segments::maybe_trim_segment(
// Readers hold a reference to the segment, so for the
// segment.owned() check to pass, we need to clear them out.
while (!st.readers.empty()) {
evict_reader(std::move(st.readers.front()));
// TODO: consider asserting here instead: it's a bug for
// 'partition' to be null, since readers outlive the partition, and
// we can't create new readers if the partition has been shut down.
auto partition = st.parent;
if (likely(partition)) {
partition->evict_reader(std::move(st.readers.front()));
}
st.readers.pop_front();
}
}
Expand Down
38 changes: 0 additions & 38 deletions src/v/cloud_storage/materialized_segments.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,10 @@ class materialized_segments {

void register_segment(materialized_segment_state& s);

/// Put reader into the eviction list which will
/// eventually lead to it being closed and deallocated
void evict_reader(std::unique_ptr<remote_segment_batch_reader> reader);
void evict_segment(ss::lw_shared_ptr<remote_segment> segment);

ssx::semaphore_units get_reader_units();

ssx::semaphore_units get_segment_units();

/// Wait until any evicted items in the _eviction_list have been removed.
ss::future<> flush_evicted();

private:
/// Timer use to periodically evict stale readers
ss::timer<ss::lowres_clock> _stm_timer;
Expand All @@ -82,33 +74,6 @@ class materialized_segments {
/// How many materialized_segment_state instances exist
size_t current_segments() const;

/// Special item in eviction_list that holds a promise and sets it
/// when the eviction fiber calls stop() (see flush_evicted)
struct eviction_barrier {
ss::promise<> promise;

ss::future<> stop() {
promise.set_value();
stopped = true;
return ss::now();
}

bool stopped{false};

bool is_stopped() const { return stopped; }
};

using evicted_resource_t = std::variant<
std::unique_ptr<remote_segment_batch_reader>,
ss::lw_shared_ptr<remote_segment>,
ss::lw_shared_ptr<eviction_barrier>>;
using eviction_list_t = std::deque<evicted_resource_t>;

/// List of segments and readers waiting to have their stop() method
/// called before destruction
eviction_list_t _eviction_pending;
eviction_list_t _eviction_in_flight;

// We need to quickly look up readers by segment, to find any readers
// for a segment that is targeted by a read. Within those readers,
// we may do a linear scan to find if any of those readers matches
Expand All @@ -118,9 +83,6 @@ class materialized_segments {
&materialized_segment_state::_hook>
_materialized;

/// Kick this condition variable when appending to eviction_list
ss::condition_variable _cvar;

/// Gate for background eviction
ss::gate _gate;

Expand Down
73 changes: 62 additions & 11 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "utils/retry_chain_node.h"
#include "utils/stream_utils.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/loop.hh>
Expand All @@ -48,6 +49,7 @@ using storage_t = model::record_batch_reader::storage_t;

remote_partition::iterator
remote_partition::materialize_segment(const segment_meta& meta) {
_as.check();
auto base_kafka_offset = meta.base_offset - meta.delta_offset;
auto units = materialized().get_segment_units();
auto st = std::make_unique<materialized_segment_state>(
Expand Down Expand Up @@ -340,6 +342,10 @@ class partition_record_batch_reader_impl final
vlog(
_ctxlog.debug,
"gate_closed_exception while reading from remote_partition");
} catch (const ss::abort_requested_exception&) {
vlog(
_ctxlog.debug,
"abort_requested_exception while reading from remote_partition");
} catch (const std::exception& e) {
vlog(
_ctxlog.warn,
Expand Down Expand Up @@ -463,7 +469,7 @@ class partition_record_batch_reader_impl final
_reader->max_rp_offset(),
_reader->is_eof(),
_next_segment_base_offset);
_partition->materialized().evict_reader(std::move(_reader));
_partition->evict_reader(std::move(_reader));
vlog(
_ctxlog.debug,
"initializing new segment reader {}, next offset",
Expand Down Expand Up @@ -521,15 +527,47 @@ class partition_record_batch_reader_impl final

remote_partition::remote_partition(
const partition_manifest& m, remote& api, cache& c, s3::bucket_name bucket)
: _rtc()
: _rtc(_as)
, _ctxlog(cst_log, _rtc, m.get_ntp().path())
, _api(api)
, _cache(c)
, _manifest(m)
, _bucket(std::move(bucket))
, _probe(m.get_ntp()) {}

ss::future<> remote_partition::start() { co_return; }
ss::future<> remote_partition::start() {
// Fiber that consumers from _eviction_list and calls stop on items before
// destroying them.
ssx::spawn_with_gate(_gate, [this] { return run_eviction_loop(); });
co_return;
}

void remote_partition::evict_reader(
std::unique_ptr<remote_segment_batch_reader> reader) {
_eviction_pending.push_back(std::move(reader));
_has_evictions_cvar.signal();
}

void remote_partition::evict_segment(
ss::lw_shared_ptr<remote_segment> segment) {
_eviction_pending.push_back(std::move(segment));
_has_evictions_cvar.signal();
}

ss::future<> remote_partition::run_eviction_loop() {
// Evict readers asynchronously.
// NOTE: exits when the condition variable is broken.
while (true) {
co_await _has_evictions_cvar.wait(
[this] { return !_eviction_pending.empty(); });
auto eviction_in_flight = std::exchange(_eviction_pending, {});
co_await ss::max_concurrent_for_each(
eviction_in_flight, 200, [](auto&& rs_variant) {
return std::visit(
[](auto&& rs) { return rs->stop(); }, rs_variant);
});
}
}

kafka::offset remote_partition::first_uploaded_offset() {
vassert(
Expand Down Expand Up @@ -633,6 +671,12 @@ remote_partition::aborted_transactions(offset_range offsets) {
ss::future<> remote_partition::stop() {
vlog(_ctxlog.debug, "remote partition stop {} segments", _segments.size());

// Prevent further segment materialization, readers, etc.
_as.request_abort();

// Signal to the eviction loop that it should terminate.
_has_evictions_cvar.broken();

co_await _gate.close();
// Remove materialized_segment_state from the list that contains it, to
// avoid it getting registered for eviction and stop.
Expand All @@ -652,13 +696,20 @@ ss::future<> remote_partition::stop() {
co_await segment->stop();
}

// We may have some segment or reader objects enqueued for stop in
// the shared eviction queue: must flush it, or they can outlive
// us and trigger assertion in retry_chain_node destructor.
// This waits for all evictions, not just ours, but that's okay because
// stopping readers is fast, the queue is not usually long, and destroying
// partitions is relatively infrequent.
co_await materialized().flush_evicted();
// Do the last pass over the eviction list to stop remaining items returned
// from readers after the eviction loop stopped.
for (auto& rs : _eviction_pending) {
co_await std::visit(
[](auto&& rs) {
if (!rs->is_stopped()) {
return rs->stop();
} else {
return ss::make_ready_future<>();
}
},
rs);
}

vlog(_ctxlog.debug, "remote partition stopped");
}

Expand All @@ -674,7 +725,7 @@ void remote_partition::return_reader(
// which 'it' points to belongs to the new segment.
it->second->return_reader(std::move(reader));
} else {
materialized().evict_reader(std::move(reader));
evict_reader(std::move(reader));
}
}

Expand Down
35 changes: 35 additions & 0 deletions src/v/cloud_storage/remote_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ class remote_partition
/// Hook for materialized_segment to notify us when a segment is evicted
void offload_segment(model::offset);

// Place on the eviction queue.
void evict_reader(std::unique_ptr<remote_segment_batch_reader> reader);
void evict_segment(ss::lw_shared_ptr<remote_segment> segment);

private:
friend struct materialized_segment_state;

Expand All @@ -136,6 +140,8 @@ class remote_partition
/// This is exposed for the benefit of the materialized_segment_state
materialized_segments& materialized();

ss::future<> run_eviction_loop();

/// Materialize segment if needed and create a reader
///
/// \param config is a reader config
Expand Down Expand Up @@ -179,11 +185,40 @@ class remote_partition
retry_chain_node _rtc;
retry_chain_logger _ctxlog;
ss::gate _gate;
ss::abort_source _as;
remote& _api;
cache& _cache;
const partition_manifest& _manifest;
s3::bucket_name _bucket;

/// Special item in eviction_list that holds a promise and sets it
/// when the eviction fiber calls stop() (see flush_evicted)
struct eviction_barrier {
ss::promise<> promise;

ss::future<> stop() {
promise.set_value();
stopped = true;
return ss::now();
}

bool stopped{false};

bool is_stopped() const { return stopped; }
};

using evicted_resource_t = std::variant<
std::unique_ptr<remote_segment_batch_reader>,
ss::lw_shared_ptr<remote_segment>,
ss::lw_shared_ptr<eviction_barrier>>;
using eviction_list_t = std::deque<evicted_resource_t>;

/// Kick this condition variable when appending to eviction_list
ss::condition_variable _has_evictions_cvar;

/// List of segments and readers waiting to have their stop() method
/// called before destruction
eviction_list_t _eviction_pending;
segment_map_t _segments;
partition_probe _probe;
};
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_storage/segment_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ namespace cloud_storage {
void materialized_segment_state::offload(remote_partition* partition) {
_hook.unlink();
for (auto&& rs : readers) {
partition->materialized().evict_reader(std::move(rs));
partition->evict_reader(std::move(rs));
}
partition->materialized().evict_segment(std::move(segment));
partition->evict_segment(std::move(segment));
partition->_probe.segment_offloaded();
}

Expand Down

0 comments on commit 9de1010

Please sign in to comment.