Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v22.3.x] cluster/archival_metadata_stm: std::vector -> fragmented_vector #9046

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/v/cloud_storage/partition_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ class partition_manifest final : public base_manifest {
model::offset lo,
model::offset lco,
model::offset insync,
const std::vector<segment_t>& segments,
const std::vector<segment_t>& replaced)
const fragmented_vector<segment_t>& segments,
const fragmented_vector<segment_t>& replaced)
: _ntp(std::move(ntp))
, _rev(rev)
, _last_offset(lo)
Expand Down
16 changes: 7 additions & 9 deletions src/v/cluster/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "serde/serde.h"
#include "ssx/future-util.h"
#include "storage/record_batch_builder.h"
#include "utils/fragmented_vector.h"
#include "utils/named_type.h"
#include "vlog.h"

Expand Down Expand Up @@ -85,9 +86,9 @@ struct archival_metadata_stm::snapshot
: public serde::
envelope<snapshot, serde::version<1>, serde::compat_version<0>> {
/// List of segments
std::vector<segment> segments;
fragmented_vector<segment> segments;
/// List of replaced segments
std::vector<segment> replaced;
fragmented_vector<segment> replaced;
/// Start offset (might be different from the base offset of the first
/// segment). Default value means that the snapshot was old and didn't
/// have start_offset. In this case we need to set it to compute it from
Expand Down Expand Up @@ -186,12 +187,10 @@ command_batch_builder archival_metadata_stm::batch_start(
return {*this, deadline, as};
}

std::vector<archival_metadata_stm::segment>
fragmented_vector<archival_metadata_stm::segment>
archival_metadata_stm::segments_from_manifest(
const cloud_storage::partition_manifest& manifest) {
std::vector<segment> segments;
segments.reserve(manifest.size());

fragmented_vector<segment> segments;
for (auto [key, meta] : manifest) {
if (meta.ntp_revision == model::initial_revision_id{}) {
meta.ntp_revision = manifest.get_revision_id();
Expand All @@ -212,12 +211,11 @@ archival_metadata_stm::segments_from_manifest(
return segments;
}

std::vector<archival_metadata_stm::segment>
fragmented_vector<archival_metadata_stm::segment>
archival_metadata_stm::replaced_segments_from_manifest(
const cloud_storage::partition_manifest& manifest) {
auto replaced = manifest.replaced_segments();
std::vector<segment> segments;
segments.reserve(replaced.size());
fragmented_vector<segment> segments;
for (auto meta : replaced) {
if (meta.ntp_revision == model::initial_revision_id{}) {
meta.ntp_revision = manifest.get_revision_id();
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ class archival_metadata_stm final : public persisted_stm {

friend segment segment_from_meta(const cloud_storage::segment_meta& meta);

static std::vector<segment>
static fragmented_vector<segment>
segments_from_manifest(const cloud_storage::partition_manifest& manifest);

static std::vector<segment> replaced_segments_from_manifest(
static fragmented_vector<segment> replaced_segments_from_manifest(
const cloud_storage::partition_manifest& manifest);

void apply_add_segment(const segment& segment);
Expand Down
163 changes: 141 additions & 22 deletions src/v/utils/fragmented_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@
#include "vassert.h"

#include <cstddef>
#include <stdexcept>
#include <type_traits>
#include <utility>
#include <vector>

namespace test_details {
struct fragmented_vector_accessor;
}

/**
* A very very simple fragmented vector that provides random access like a
* vector, but does not store its data in contiguous memory.
Expand All @@ -34,18 +41,41 @@
* all of the space at once we might benefit from the allocator helping with
* giving us contiguous memory.
*/
template<typename T, size_t fragment_size = 8192>
template<typename T, size_t max_fragment_size = 8192>
class fragmented_vector {
// calculate the maximum number of elements per fragment while
// keeping the element count a power of two
static constexpr size_t calc_elems_per_frag(size_t esize) {
size_t max = max_fragment_size / esize;
assert(max > 0);
// round down to a power of two
size_t pow2 = 1;
while (pow2 * 2 <= max) {
pow2 *= 2;
}
return pow2;
}

static constexpr size_t elems_per_frag = calc_elems_per_frag(sizeof(T));

static_assert(
(fragment_size & (fragment_size - 1)) == 0,
"fragment size must be a power of 2");
static_assert(fragment_size % sizeof(T) == 0);
static constexpr size_t elems_per_frag = fragment_size / sizeof(T);
(elems_per_frag & (elems_per_frag - 1)) == 0,
"element count per fragment must be a power of 2");
static_assert(elems_per_frag >= 1);

public:
using this_type = fragmented_vector<T, max_fragment_size>;
using value_type = T;

/**
* The maximum number of bytes per fragment as specified in
* as part of the type. Note that for most types, the true
* number of bytes in a full fragment may as low as half
* of this amount (+1) since the number of elements is restricted
* to a power of two.
*/
static constexpr size_t max_frag_bytes = max_fragment_size;

fragmented_vector() noexcept = default;
fragmented_vector& operator=(const fragmented_vector&) noexcept = delete;
fragmented_vector(fragmented_vector&&) noexcept = default;
Expand Down Expand Up @@ -82,9 +112,7 @@ class fragmented_vector {
}

T& operator[](size_t index) {
vassert(index < _size, "Index out of range {}/{}", index, _size);
auto& frag = _frags.at(index / elems_per_frag);
return frag.at(index % elems_per_frag);
return const_cast<T&>(std::as_const(*this)[index]);
}

const T& back() const { return _frags.back().back(); }
Expand All @@ -101,57 +129,148 @@ class fragmented_vector {
return o._frags == _frags;
}

class const_iterator {
/**
* Returns the approximate in-memory size of this vector in bytes.
*/
size_t memory_size() const {
return _frags.size() * (sizeof(_frags[0]) + elems_per_frag * sizeof(T));
}

/**
* Returns the (maximum) number of elements in each fragment of this vector.
*/
static size_t elements_per_fragment() { return elems_per_frag; }

/**
* Assign from a std::vector.
*/
fragmented_vector& operator=(const std::vector<T>& rhs) noexcept {
clear();

for (auto& e : rhs) {
push_back(e);
}

return *this;
}

/**
* Remove all elements from the vector.
*
* Unlike std::vector, this also releases all the memory from
* the vector (since this vector already the same pointer
* and iterator stability guarantees that std::vector provides
* based on non-reallocation and capacity()).
*/
void clear() {
// do the swap dance to actually clear the memory held by the vector
std::vector<std::vector<T>>{}.swap(_frags);
_size = 0;
_capacity = 0;
}

template<bool C>
class iter {
public:
using iterator_category = std::random_access_iterator_tag;
using value_type = const T;
using value_type = typename std::conditional_t<C, const T, T>;
using difference_type = std::ptrdiff_t;
using pointer = const T*;
using reference = const T&;
using pointer = value_type*;
using reference = value_type&;

iter() = default;

reference operator*() const { return _vec->operator[](_index); }

const_iterator& operator+=(ssize_t n) {
iter& operator+=(ssize_t n) {
_index += n;
return *this;
}

const_iterator& operator++() {
iter& operator-=(ssize_t n) {
_index -= n;
return *this;
}

iter& operator++() {
++_index;
return *this;
}

const_iterator& operator--() {
iter& operator--() {
--_index;
return *this;
}

bool operator==(const const_iterator&) const = default;
iter operator++(int) {
auto tmp = *this;
++*this;
return tmp;
}

iter operator--(int) {
auto tmp = *this;
--*this;
return tmp;
}

iter operator+(difference_type offset) { return iter{*this} += offset; }
iter operator-(difference_type offset) { return iter{*this} -= offset; }

friend ssize_t
operator-(const const_iterator& a, const const_iterator& b) {
bool operator==(const iter&) const = default;
auto operator<=>(const iter&) const = default;

friend ssize_t operator-(const iter& a, const iter& b) {
return a._index - b._index;
}

private:
friend class fragmented_vector;
using vec_type = std::conditional_t<C, const this_type, this_type>;

const_iterator(
const fragmented_vector<T, fragment_size>* vec, size_t index)
iter(vec_type* vec, size_t index)
: _index(index)
, _vec(vec) {}

size_t _index;
const fragmented_vector<T, fragment_size>* _vec;
size_t _index{};
vec_type* _vec{};
};

using const_iterator = iter<true>;
using iterator = iter<false>;

iterator begin() { return iterator(this, 0); }
iterator end() { return iterator(this, _size); }

const_iterator begin() const { return const_iterator(this, 0); }
const_iterator end() const { return const_iterator(this, _size); }

const_iterator cbegin() const { return const_iterator(this, 0); }
const_iterator cend() const { return const_iterator(this, _size); }

friend test_details::fragmented_vector_accessor;

friend std::ostream&
operator<<(std::ostream& os, const fragmented_vector& v) {
os << "[";
for (auto& e : v) {
os << e << ",";
}
os << "]";
return os;
}

private:
fragmented_vector(const fragmented_vector&) noexcept = default;

size_t _size{0};
size_t _capacity{0};
std::vector<std::vector<T>> _frags;
};

/**
* An alias for a fragmented_vector using a larger fragment size, close
* to the limit of the maximum contiguous allocation size.
*/
template<typename T>
using large_fragment_vector = fragmented_vector<T, 32 * 1024>;
Loading