Skip to content

Commit

Permalink
rm_stm: replace most std::vector with fragmented_vector
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Mar 16, 2023
1 parent bfd7dc0 commit ef9769f
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 98 deletions.
2 changes: 1 addition & 1 deletion src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ ss::future<cloud_storage::upload_result> ntp_archiver::upload_tx(

auto path = segment_path_for_candidate(candidate);

cloud_storage::tx_range_manifest manifest(path, tx_range);
cloud_storage::tx_range_manifest manifest(path, std::move(tx_range));

co_return co_await _remote.upload_manifest(
get_bucket_name(), manifest, fib, _tx_tags);
Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_storage/tests/tx_range_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ SEASTAR_THREAD_TEST_CASE(empty_serialization_roundtrip_test) {
}

SEASTAR_THREAD_TEST_CASE(serialization_roundtrip_test) {
tx_range_manifest m(segment_path, ranges);
fragmented_vector<tx_range_t> tx_ranges;
tx_ranges = ranges;
tx_range_manifest m(segment_path, std::move(tx_ranges));
auto [is, size] = m.serialize().get();
iobuf buf;
auto os = make_iobuf_ref_output_stream(buf);
Expand Down
10 changes: 3 additions & 7 deletions src/v/cloud_storage/tx_range_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,9 @@ remote_manifest_path generate_remote_tx_path(const remote_segment_path& path) {
}

tx_range_manifest::tx_range_manifest(
remote_segment_path spath, const std::vector<model::tx_range>& range)
: _path(std::move(spath)) {
for (const auto& tx : range) {
_ranges.push_back(tx);
}
_ranges.shrink_to_fit();
}
remote_segment_path spath, fragmented_vector<model::tx_range>&& range)
: _path(std::move(spath))
, _ranges(std::move(range)) {}

tx_range_manifest::tx_range_manifest(remote_segment_path spath)
: _path(std::move(spath)) {}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_storage/tx_range_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class tx_range_manifest final : public base_manifest {
public:
/// Create manifest for specific ntp
explicit tx_range_manifest(
remote_segment_path spath, const std::vector<model::tx_range>& range);
remote_segment_path spath, fragmented_vector<model::tx_range>&& range);

/// Create empty manifest that supposed to be updated later
explicit tx_range_manifest(remote_segment_path spath);
Expand Down
7 changes: 4 additions & 3 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "raft/types.h"
#include "storage/translating_reader.h"
#include "storage/types.h"
#include "utils/fragmented_vector.h"

#include <seastar/core/shared_ptr.hh>

Expand Down Expand Up @@ -211,11 +212,11 @@ class partition {

ss::shared_ptr<cluster::tm_stm> tm_stm() { return _tm_stm; }

ss::future<std::vector<rm_stm::tx_range>>
ss::future<fragmented_vector<rm_stm::tx_range>>
aborted_transactions(model::offset from, model::offset to) {
if (!_rm_stm) {
return ss::make_ready_future<std::vector<rm_stm::tx_range>>(
std::vector<rm_stm::tx_range>());
return ss::make_ready_future<fragmented_vector<rm_stm::tx_range>>(
fragmented_vector<rm_stm::tx_range>());
}
return _rm_stm->aborted_transactions(from, to);
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/persisted_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ model::offset persisted_stm::max_collectible_offset() {
return model::offset::max();
}

ss::future<std::vector<model::tx_range>>
ss::future<fragmented_vector<model::tx_range>>
persisted_stm::aborted_tx_ranges(model::offset, model::offset) {
return ss::make_ready_future<std::vector<model::tx_range>>();
return ss::make_ready_future<fragmented_vector<model::tx_range>>();
}

ss::future<> persisted_stm::wait_offset_committed(
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/persisted_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "storage/snapshot.h"
#include "storage/types.h"
#include "utils/expiring_promise.h"
#include "utils/fragmented_vector.h"
#include "utils/mutex.h"

#include <absl/container/flat_hash_map.h>
Expand Down Expand Up @@ -88,7 +89,7 @@ class persisted_stm
void make_snapshot_in_background() final;
ss::future<> ensure_snapshot_exists(model::offset) final;
model::offset max_collectible_offset() override;
ss::future<std::vector<model::tx_range>>
ss::future<fragmented_vector<model::tx_range>>
aborted_tx_ranges(model::offset, model::offset) override;
const ss::sstring& name() override { return _snapshot_mgr.name(); }

Expand Down
121 changes: 61 additions & 60 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,13 @@ struct seq_entry_v0 {
struct tx_snapshot_v0 {
static constexpr uint8_t version = 0;

std::vector<model::producer_identity> fenced;
std::vector<rm_stm::tx_range> ongoing;
std::vector<rm_stm::prepare_marker> prepared;
std::vector<rm_stm::tx_range> aborted;
std::vector<rm_stm::abort_index> abort_indexes;
fragmented_vector<model::producer_identity> fenced;
fragmented_vector<rm_stm::tx_range> ongoing;
fragmented_vector<rm_stm::prepare_marker> prepared;
fragmented_vector<rm_stm::tx_range> aborted;
fragmented_vector<rm_stm::abort_index> abort_indexes;
model::offset offset;
std::vector<seq_entry_v0> seqs;
fragmented_vector<seq_entry_v0> seqs;
};

struct seq_cache_entry_v1 {
Expand All @@ -238,25 +238,25 @@ struct seq_entry_v1 {
struct tx_snapshot_v1 {
static constexpr uint8_t version = 1;

std::vector<model::producer_identity> fenced;
std::vector<rm_stm::tx_range> ongoing;
std::vector<rm_stm::prepare_marker> prepared;
std::vector<rm_stm::tx_range> aborted;
std::vector<rm_stm::abort_index> abort_indexes;
fragmented_vector<model::producer_identity> fenced;
fragmented_vector<rm_stm::tx_range> ongoing;
fragmented_vector<rm_stm::prepare_marker> prepared;
fragmented_vector<rm_stm::tx_range> aborted;
fragmented_vector<rm_stm::abort_index> abort_indexes;
model::offset offset;
std::vector<seq_entry_v1> seqs;
fragmented_vector<seq_entry_v1> seqs;
};

struct tx_snapshot_v2 {
static constexpr uint8_t version = 2;

std::vector<model::producer_identity> fenced;
std::vector<rm_stm::tx_range> ongoing;
std::vector<rm_stm::prepare_marker> prepared;
std::vector<rm_stm::tx_range> aborted;
std::vector<rm_stm::abort_index> abort_indexes;
fragmented_vector<model::producer_identity> fenced;
fragmented_vector<rm_stm::tx_range> ongoing;
fragmented_vector<rm_stm::prepare_marker> prepared;
fragmented_vector<rm_stm::tx_range> aborted;
fragmented_vector<rm_stm::abort_index> abort_indexes;
model::offset offset;
std::vector<rm_stm::seq_entry> seqs;
fragmented_vector<rm_stm::seq_entry> seqs;
};

rm_stm::rm_stm(
Expand Down Expand Up @@ -1883,8 +1883,8 @@ model::offset rm_stm::last_stable_offset() {
}

static void filter_intersecting(
std::vector<rm_stm::tx_range>& target,
const std::vector<rm_stm::tx_range>& source,
fragmented_vector<rm_stm::tx_range>& target,
const fragmented_vector<rm_stm::tx_range>& source,
model::offset from,
model::offset to) {
for (auto& range : source) {
Expand All @@ -1898,7 +1898,7 @@ static void filter_intersecting(
}
}

ss::future<std::vector<rm_stm::tx_range>>
ss::future<fragmented_vector<rm_stm::tx_range>>
rm_stm::aborted_transactions(model::offset from, model::offset to) {
return _state_lock.hold_read_lock().then(
[from, to, this](ss::basic_rwlock<>::holder unit) mutable {
Expand All @@ -1907,13 +1907,13 @@ rm_stm::aborted_transactions(model::offset from, model::offset to) {
});
}

ss::future<std::vector<rm_stm::tx_range>>
ss::future<fragmented_vector<rm_stm::tx_range>>
rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
std::vector<rm_stm::tx_range> result;
fragmented_vector<rm_stm::tx_range> result;
if (!_is_tx_enabled) {
co_return result;
}
std::vector<abort_index> intersecting_idxes;
fragmented_vector<abort_index> intersecting_idxes;
for (const auto& idx : _log_state.abort_indexes) {
if (idx.last < from) {
continue;
Expand All @@ -1922,7 +1922,7 @@ rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
continue;
}
if (_log_state.last_abort_snapshot.match(idx)) {
auto opt = _log_state.last_abort_snapshot;
auto& opt = _log_state.last_abort_snapshot;
filter_intersecting(result, opt.aborted, from, to);
} else {
intersecting_idxes.push_back(idx);
Expand Down Expand Up @@ -1955,12 +1955,9 @@ void rm_stm::compact_snapshot() {
return;
}

std::vector<model::timestamp::type> lw_tss;
lw_tss.reserve(_log_state.seq_table.size());
for (auto it = _log_state.seq_table.cbegin();
it != _log_state.seq_table.cend();
it++) {
lw_tss.push_back(it->second.entry.last_write_timestamp);
fragmented_vector<model::timestamp::type> lw_tss;
for (const auto& it : _log_state.seq_table) {
lw_tss.push_back(it.second.entry.last_write_timestamp);
}
std::sort(lw_tss.begin(), lw_tss.end());
auto pivot = lw_tss[lw_tss.size() - 1 - _seq_table_min_size];
Expand Down Expand Up @@ -2034,7 +2031,7 @@ ss::future<> rm_stm::do_abort_old_txes() {
co_return;
}

std::vector<model::producer_identity> pids;
fragmented_vector<model::producer_identity> pids;
for (auto& [k, _] : _mem_state.estimated) {
pids.push_back(k);
}
Expand Down Expand Up @@ -2469,12 +2466,12 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
data.seqs = std::move(data_v2.seqs);

for (auto& entry : data_v2.prepared) {
data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{
data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{
.pid = entry.pid, .tx_seq = entry.tx_seq});
}

for (auto& entry : data_v2.seqs) {
data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{
data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{
.pid = entry.pid, .tx_seq = model::tx_seq(entry.seq)});
}
} else if (hdr.version == tx_snapshot_v1::version) {
Expand Down Expand Up @@ -2505,7 +2502,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
}

for (auto& entry : data_v1.prepared) {
data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{
data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{
.pid = entry.pid, .tx_seq = entry.tx_seq});
}
} else if (hdr.version == tx_snapshot_v0::version) {
Expand All @@ -2520,7 +2517,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
data.seqs.push_back(std::move(seq));
}
for (auto& entry : data_v0.prepared) {
data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{
data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{
.pid = entry.pid, .tx_seq = entry.tx_seq});
}
} else {
Expand All @@ -2538,10 +2535,11 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
for (auto& entry : data.prepared) {
_log_state.prepared.emplace(entry.pid, entry);
}
_log_state.aborted.insert(
_log_state.aborted.end(),
std::make_move_iterator(data.aborted.begin()),
std::make_move_iterator(data.aborted.end()));
for (auto it = std::make_move_iterator(data.aborted.begin());
it != std::make_move_iterator(data.aborted.end());
it++) {
_log_state.aborted.push_back(*it);
}
co_await ss::max_concurrent_for_each(
data.abort_indexes, 32, [this](const abort_index& idx) -> ss::future<> {
auto f_name = abort_idx_name(idx.first, idx.last);
Expand All @@ -2551,10 +2549,12 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
std::make_pair(idx.first, idx.last), snapshot_size);
});
});
_log_state.abort_indexes.insert(
_log_state.abort_indexes.end(),
std::make_move_iterator(data.abort_indexes.begin()),
std::make_move_iterator(data.abort_indexes.end()));

for (auto it = std::make_move_iterator(data.abort_indexes.begin());
it != std::make_move_iterator(data.abort_indexes.end());
it++) {
_log_state.abort_indexes.push_back(*it);
}
for (auto& entry : data.seqs) {
const auto pid = entry.pid;
auto it = _log_state.seq_table.find(pid);
Expand All @@ -2576,7 +2576,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
if (last.last > model::offset(0)) {
auto snapshot_opt = co_await load_abort_snapshot(last);
if (snapshot_opt) {
_log_state.last_abort_snapshot = snapshot_opt.value();
_log_state.last_abort_snapshot = std::move(snapshot_opt.value());
}
}

Expand All @@ -2596,13 +2596,13 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
// We need to fill order for idempotent requests. So pid is from idempotent
// request if it is not inside fence_pid_epoch. For order we just need to
// check last_write_timestamp. It contains time last apply for log record.
std::vector<log_state::seq_map::iterator> sorted_pids;
fragmented_vector<log_state::seq_map::iterator> sorted_pids;
for (auto it = _log_state.seq_table.begin();
it != _log_state.seq_table.end();
++it) {
if (!_log_state.fence_pid_epoch.contains(
it->second.entry.pid.get_id())) {
sorted_pids.emplace_back(it);
sorted_pids.push_back(it);
}
}

Expand Down Expand Up @@ -2677,22 +2677,21 @@ ss::future<> rm_stm::offload_aborted_txns() {
auto idx = abort_index{
.first = snapshot.first, .last = snapshot.last};
_log_state.abort_indexes.push_back(idx);
co_await save_abort_snapshot(snapshot);
co_await save_abort_snapshot(std::move(snapshot));
snapshot = abort_snapshot{
.first = model::offset::max(), .last = model::offset::min()};
}
}
_log_state.aborted = snapshot.aborted;
_log_state.aborted = std::move(snapshot.aborted);
}

// DO NOT coroutinize this method as it may cause issues on ARM:
// https://github.com/redpanda-data/redpanda/issues/6768
ss::future<stm_snapshot> rm_stm::take_snapshot() {
auto start_offset = _raft->start_offset();

std::vector<abort_index> abort_indexes;
std::vector<abort_index> expired_abort_indexes;
abort_indexes.reserve(_log_state.abort_indexes.size());
fragmented_vector<abort_index> abort_indexes;
fragmented_vector<abort_index> expired_abort_indexes;

for (const auto& idx : _log_state.abort_indexes) {
if (idx.last < start_offset) {
Expand All @@ -2712,7 +2711,8 @@ ss::future<stm_snapshot> rm_stm::take_snapshot() {
expired_abort_indexes.size(),
start_offset);
auto f = ss::do_with(
std::move(expired_abort_indexes), [this](std::vector<abort_index>& idxs) {
std::move(expired_abort_indexes),
[this](fragmented_vector<abort_index>& idxs) {
return ss::parallel_for_each(
idxs.begin(), idxs.end(), [this](const abort_index& idx) {
auto f_name = abort_idx_name(idx.first, idx.last);
Expand All @@ -2726,8 +2726,7 @@ ss::future<stm_snapshot> rm_stm::take_snapshot() {
});
});

std::vector<tx_range> aborted;
aborted.reserve(_log_state.aborted.size());
fragmented_vector<tx_range> aborted;
std::copy_if(
_log_state.aborted.begin(),
_log_state.aborted.end(),
Expand Down Expand Up @@ -2825,11 +2824,13 @@ uint64_t rm_stm::get_snapshot_size() const {
return persisted_stm::get_snapshot_size() + abort_snapshots_size;
}

ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {
auto filename = abort_idx_name(snapshot.first, snapshot.last);
ss::future<> rm_stm::save_abort_snapshot(abort_snapshot&& snapshot) {
auto first_offset = snapshot.first;
auto last_offset = snapshot.last;
auto filename = abort_idx_name(first_offset, last_offset);
vlog(_ctx_log.debug, "saving abort snapshot {} at {}", snapshot, filename);
iobuf snapshot_data;
reflection::adl<abort_snapshot>{}.to(snapshot_data, snapshot);
reflection::adl<abort_snapshot>{}.to(snapshot_data, std::move(snapshot));
int32_t snapshot_size = snapshot_data.size_bytes();

auto writer = co_await _abort_snapshot_mgr.start_snapshot(filename);
Expand All @@ -2844,7 +2845,7 @@ ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {
uint64_t snapshot_disk_size
= co_await _abort_snapshot_mgr.get_snapshot_size(filename);
_abort_snapshot_sizes.emplace(
std::make_pair(snapshot.first, snapshot.last), snapshot_disk_size);
std::make_pair(first_offset, last_offset), snapshot_disk_size);
}

ss::future<std::optional<rm_stm::abort_snapshot>>
Expand Down Expand Up @@ -2968,7 +2969,7 @@ ss::future<> rm_stm::clear_old_tx_pids() {
co_return;
}

std::vector<model::producer_identity> pids_for_delete;
fragmented_vector<model::producer_identity> pids_for_delete;
for (auto [id, epoch] : _log_state.fence_pid_epoch) {
auto pid = model::producer_identity(id, epoch);
// If pid is not inside tx_seqs it means we do not have transaction for
Expand Down
Loading

0 comments on commit ef9769f

Please sign in to comment.