Skip to content

Commit

Permalink
Merge pull request #14954 from vbotbuildovich/backport-pr-14856-v23.2…
Browse files Browse the repository at this point in the history
….x-294

[v23.2.x] segment_meta_cstore: handle gaps and overlaps
  • Loading branch information
piyushredpanda authored Nov 14, 2023
2 parents 03a3b05 + f76a52b commit 8d33fbc
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 26 deletions.
17 changes: 9 additions & 8 deletions src/v/cloud_storage/segment_meta_cstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ class column_store
// replacements either start before or exactly at 0
return size_t{0};
}
auto candidate = _base_offset.find(offset_seg_it->first());
auto candidate = _base_offset.lower_bound(offset_seg_it->first());
if (candidate.is_end()) {
// replacements are append only, return an index that will
// signal this
Expand Down Expand Up @@ -398,12 +398,11 @@ class column_store
last_hint_tosave,
_hints.end()};

auto unchanged_committed_offset
= replacement_store.last_committed_offset().value_or(
model::offset::min());
auto unchanged_base_offset = replacement_store._base_offset.last_value()
.value_or(model::offset::min()());

// iterator pointing to first segment not cloned into replacement_store
auto old_segments_it = upper_bound(unchanged_committed_offset());
auto old_segments_it = upper_bound(unchanged_base_offset);
auto old_segments_end = end();

// merge replacements and old segments into new store
Expand All @@ -413,9 +412,9 @@ class column_store
++offset_seg_it;

auto old_seg = dereference(old_segments_it);
// append old segments with committed offset smaller than
// append old segments with base_offset smaller than
// replacement
while (old_seg.committed_offset < replacement_base_offset) {
while (old_seg.base_offset < replacement_base_offset) {
replacement_store.append(old_seg);
details::increment_all(old_segments_it);
if (old_segments_it == old_segments_end) {
Expand Down Expand Up @@ -1010,7 +1009,9 @@ class segment_meta_cstore::impl
if (_write_buffer.size() > 1) {
auto not_replaced_segment = std::find_if(
std::next(m_it), _write_buffer.end(), [&](auto& kv) {
return kv.first > m.committed_offset;
// first element with a committed offset that spans over the
// range of m
return kv.second.committed_offset > m.committed_offset;
});
// if(next(m_it) == not_replaced_segment) there is nothing to erase,
// _write_buffer.erase would do nothing
Expand Down
30 changes: 12 additions & 18 deletions src/v/cloud_storage/segment_meta_cstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,11 @@ class segment_meta_column_impl
}
}

// crtp helper
auto to_underlying() const -> decltype(auto) {
return *static_cast<Derived const*>(this);
}

public:
/// Position in the column, can be used by
/// index lookup operations
Expand Down Expand Up @@ -694,18 +699,15 @@ class segment_meta_column_impl
const_iterator end() const { return const_iterator(); }

const_iterator find(value_t value) const {
return static_cast<const Derived*>(this)
->template pred_search<std::equal_to<value_t>>(value);
return to_underlying().pred_search(value, std::equal_to<>{});
}

const_iterator upper_bound(value_t value) const {
return static_cast<const Derived*>(this)
->template pred_search<std::greater<value_t>>(value);
return to_underlying().pred_search(value, std::greater<>{});
}

const_iterator lower_bound(value_t value) const {
return static_cast<const Derived*>(this)
->template pred_search<std::greater_equal<value_t>>(value);
return to_underlying().pred_search(value, std::greater_equal<>{});
}

std::optional<value_t> last_value() const {
Expand Down Expand Up @@ -782,12 +784,6 @@ class segment_meta_column_impl
}

protected:
template<class PredT>
const_iterator pred_search_impl(value_t value) const {
return static_cast<const Derived*>(this)->template pred_search<PredT>(
value);
}

auto get_frame_iterator_by_element_index(size_t ix) {
return std::find_if(
_frames.begin(), _frames.end(), [ix](frame_t const& f) mutable {
Expand Down Expand Up @@ -825,9 +821,8 @@ class segment_meta_column<value_t, details::delta_xor>
using base_t::base_t;
using typename base_t::const_iterator;

template<class PredT>
const_iterator pred_search(value_t value) const {
PredT pred;
const_iterator pred_search(
value_t value, std::regular_invocable<value_t, value_t> auto pred) const {
for (auto it = this->begin(); it != this->end(); ++it) {
if (pred(*it, value)) {
return it;
Expand Down Expand Up @@ -860,9 +855,8 @@ class segment_meta_column<value_t, details::delta_delta<value_t>>
using base_t::base_t;
using typename base_t::const_iterator;

template<class PredT>
const_iterator pred_search(value_t value) const {
PredT pred;
const_iterator pred_search(
value_t value, std::regular_invocable<value_t, value_t> auto pred) const {
auto it = this->_frames.begin();
size_t index = 0;
for (; it != this->_frames.end(); ++it) {
Expand Down
193 changes: 193 additions & 0 deletions src/v/cloud_storage/tests/segment_meta_cstore_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,199 @@ BOOST_AUTO_TEST_CASE(test_segment_meta_cstore_insert_replacements) {
merged_result.end()));
}

BOOST_AUTO_TEST_CASE(test_segment_meta_cstore_insert_in_gap) {
auto make_seg = [](auto base) {
return segment_meta{
.is_compacted = false,
.size_bytes = 812,
.base_offset = model::offset(base),
.committed_offset = model::offset(base + 9),
.base_timestamp = model::timestamp(1646430092103),
.max_timestamp = model::timestamp(1646430092103),
.delta_offset = model::offset_delta(0),
.archiver_term = model::term_id(2),
.segment_term = model::term_id(0),
.delta_offset_end = model::offset_delta(0),
.sname_format = segment_name_format::v3,
.metadata_size_hint = 0,
};
};

std::vector<size_t> baseline_entries{0, 13, 16, 17, 29, 30, 31, 32};
for (size_t entries : baseline_entries) {
auto base_offset = 0;
std::vector<segment_meta> metas;
segment_meta_cstore store{};

// Insert different number of baseline entries to get differrent
// frame configurations.
for (size_t i = 0; i < entries; ++i) {
auto seg = make_seg(base_offset);
base_offset += 10;

metas.push_back(seg);
store.insert(seg);
}

auto next_seg = make_seg(base_offset);
auto next_next_seg = make_seg(base_offset + 10);
auto next_next_next_seg = make_seg(base_offset + 20);

metas.push_back(next_seg);
metas.push_back(next_next_seg);
metas.push_back(next_next_next_seg);

// Insert two segments and create a gap between them
store.insert(next_seg);
store.insert(next_next_next_seg);

// Flush the write buffer such that the next insert does
// not get re-ordered in the right place.
store.flush_write_buffer();

// Insert in the gap
store.insert(next_next_seg);

BOOST_CHECK_EQUAL(store.size(), metas.size());
BOOST_CHECK(*store.begin() == metas[0]);
BOOST_CHECK_EQUAL(
store.last_segment().value(), metas[metas.size() - 1]);

auto expected_iter = metas.begin();
auto cstore_iter = store.begin();

for (; expected_iter != metas.end(); ++expected_iter, ++cstore_iter) {
BOOST_CHECK_EQUAL(*expected_iter, *cstore_iter);
}
}
}

BOOST_AUTO_TEST_CASE(test_segment_meta_cstore_overlap_no_replace) {
auto make_seg = [](auto base, std::optional<int64_t> last) {
return segment_meta{
.is_compacted = false,
.size_bytes = 812,
.base_offset = model::offset(base),
.committed_offset = model::offset(last ? *last : base + 9),
.base_timestamp = model::timestamp(1646430092103),
.max_timestamp = model::timestamp(1646430092103),
.delta_offset = model::offset_delta(0),
.archiver_term = model::term_id(2),
.segment_term = model::term_id(0),
.delta_offset_end = model::offset_delta(0),
.sname_format = segment_name_format::v3,
.metadata_size_hint = 0,
};
};

std::vector<size_t> baseline_entries{1, 13, 16, 17, 29, 30, 31, 32};
for (size_t entries : baseline_entries) {
auto base_offset = 0;
std::vector<segment_meta> metas;
segment_meta_cstore store{};

// Insert different number of baseline entries to get differrent
// frame configurations.
for (size_t i = 0; i < entries; ++i) {
auto seg = make_seg(base_offset, std::nullopt);
base_offset += 10;

metas.push_back(seg);
store.insert(seg);
}

auto last_seg = store.last_segment();
BOOST_REQUIRE(last_seg.has_value());

// Select a segment that is fully contained by the last segment.
auto next_seg = make_seg(
last_seg->base_offset() - 5, last_seg->base_offset() + 5);
metas.insert(--metas.end(), next_seg);

// Flush the write buffer such that the next insert does
// not get re-ordered in the right place.
store.flush_write_buffer();

// Insert the overlapping segment
store.insert(next_seg);

BOOST_CHECK_EQUAL(store.size(), metas.size());
BOOST_CHECK(*store.begin() == metas[0]);
BOOST_CHECK_EQUAL(
store.last_segment().value(), metas[metas.size() - 1]);

auto expected_iter = metas.begin();
auto cstore_iter = store.begin();

for (; expected_iter != metas.end(); ++expected_iter, ++cstore_iter) {
BOOST_CHECK_EQUAL(*expected_iter, *cstore_iter);
}
}
}

BOOST_AUTO_TEST_CASE(test_segment_meta_cstore_overlap_with_replace) {
auto make_seg = [](auto base, std::optional<int64_t> last) {
return segment_meta{
.is_compacted = false,
.size_bytes = 812,
.base_offset = model::offset(base),
.committed_offset = model::offset(last ? *last : base + 9),
.base_timestamp = model::timestamp(1646430092103),
.max_timestamp = model::timestamp(1646430092103),
.delta_offset = model::offset_delta(0),
.archiver_term = model::term_id(2),
.segment_term = model::term_id(0),
.delta_offset_end = model::offset_delta(0),
.sname_format = segment_name_format::v3,
.metadata_size_hint = 0,
};
};

std::vector<size_t> baseline_entries{13, 16, 17, 29, 30, 31, 32};
for (size_t entries : baseline_entries) {
auto base_offset = 0;
std::vector<segment_meta> metas;
segment_meta_cstore store{};

// Insert different number of baseline entries to get differrent
// frame configurations.
for (size_t i = 0; i < entries; ++i) {
auto seg = make_seg(base_offset, std::nullopt);
base_offset += 10;

metas.push_back(seg);
store.insert(seg);
}

auto replaced_seg = metas[metas.size() - 2];

// Select a segment that fully includes the penultimate segment.
auto next_seg = make_seg(
replaced_seg.base_offset() - 5, replaced_seg.committed_offset() + 5);
metas.insert(metas.end() - 2, next_seg);
metas.erase(metas.end() - 2);

// Flush the write buffer such that the next insert does
// not get re-ordered in the right place.
store.flush_write_buffer();

// Insert the overlapping segment
store.insert(next_seg);

BOOST_CHECK_EQUAL(store.size(), metas.size());
BOOST_CHECK(*store.begin() == metas[0]);
BOOST_CHECK_EQUAL(
store.last_segment().value(), metas[metas.size() - 1]);

auto expected_iter = metas.begin();
auto cstore_iter = store.begin();

for (; expected_iter != metas.end(); ++expected_iter, ++cstore_iter) {
BOOST_CHECK_EQUAL(*expected_iter, *cstore_iter);
}
}
}

BOOST_AUTO_TEST_CASE(test_segment_meta_cstore_append_retrieve_edge_case) {
// test to trigger a corner case, where the _hints vector is misaligned with
// the frames basically the first element of a frame is not tracked by a
Expand Down

0 comments on commit 8d33fbc

Please sign in to comment.