Skip to content

Commit

Permalink
switch all vectors to frag vectors
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Mar 15, 2023
1 parent f88655c commit d249ea0
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 77 deletions.
2 changes: 1 addition & 1 deletion src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ ss::future<cloud_storage::upload_result> ntp_archiver::upload_tx(

auto path = segment_path_for_candidate(candidate);

cloud_storage::tx_range_manifest manifest(path, tx_range);
cloud_storage::tx_range_manifest manifest(path, std::move(tx_range));

co_return co_await _remote.upload_manifest(
get_bucket_name(), manifest, fib, _tx_tags);
Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_storage/tests/tx_range_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ SEASTAR_THREAD_TEST_CASE(empty_serialization_roundtrip_test) {
}

SEASTAR_THREAD_TEST_CASE(serialization_roundtrip_test) {
tx_range_manifest m(segment_path, ranges);
fragmented_vector<tx_range_t> tx_ranges;
tx_ranges = ranges;
tx_range_manifest m(segment_path, std::move(tx_ranges));
auto [is, size] = m.serialize().get();
iobuf buf;
auto os = make_iobuf_ref_output_stream(buf);
Expand Down
10 changes: 3 additions & 7 deletions src/v/cloud_storage/tx_range_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,9 @@ remote_manifest_path generate_remote_tx_path(const remote_segment_path& path) {
}

tx_range_manifest::tx_range_manifest(
remote_segment_path spath, const std::vector<model::tx_range>& range)
: _path(std::move(spath)) {
for (const auto& tx : range) {
_ranges.push_back(tx);
}
_ranges.shrink_to_fit();
}
remote_segment_path spath, fragmented_vector<model::tx_range>&& range)
: _path(std::move(spath))
, _ranges(std::move(range)) {}

tx_range_manifest::tx_range_manifest(remote_segment_path spath)
: _path(std::move(spath)) {}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_storage/tx_range_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class tx_range_manifest final : public base_manifest {
public:
/// Create manifest for specific ntp
explicit tx_range_manifest(
remote_segment_path spath, const std::vector<model::tx_range>& range);
remote_segment_path spath, fragmented_vector<model::tx_range>&& range);

/// Create empty manifest that supposed to be updated later
explicit tx_range_manifest(remote_segment_path spath);
Expand Down
7 changes: 4 additions & 3 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "raft/types.h"
#include "storage/translating_reader.h"
#include "storage/types.h"
#include "utils/fragmented_vector.h"

#include <seastar/core/shared_ptr.hh>

Expand Down Expand Up @@ -211,11 +212,11 @@ class partition {

ss::shared_ptr<cluster::tm_stm> tm_stm() { return _tm_stm; }

ss::future<std::vector<rm_stm::tx_range>>
ss::future<fragmented_vector<rm_stm::tx_range>>
aborted_transactions(model::offset from, model::offset to) {
if (!_rm_stm) {
return ss::make_ready_future<std::vector<rm_stm::tx_range>>(
std::vector<rm_stm::tx_range>());
return ss::make_ready_future<fragmented_vector<rm_stm::tx_range>>(
fragmented_vector<rm_stm::tx_range>());
}
return _rm_stm->aborted_transactions(from, to);
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/persisted_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ model::offset persisted_stm::max_collectible_offset() {
return model::offset::max();
}

ss::future<std::vector<model::tx_range>>
ss::future<fragmented_vector<model::tx_range>>
persisted_stm::aborted_tx_ranges(model::offset, model::offset) {
return ss::make_ready_future<std::vector<model::tx_range>>();
return ss::make_ready_future<fragmented_vector<model::tx_range>>();
}

ss::future<> persisted_stm::wait_offset_committed(
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/persisted_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "storage/snapshot.h"
#include "storage/types.h"
#include "utils/expiring_promise.h"
#include "utils/fragmented_vector.h"
#include "utils/mutex.h"

#include <absl/container/flat_hash_map.h>
Expand Down Expand Up @@ -88,7 +89,7 @@ class persisted_stm
void make_snapshot_in_background() final;
ss::future<> ensure_snapshot_exists(model::offset) final;
model::offset max_collectible_offset() override;
ss::future<std::vector<model::tx_range>>
ss::future<fragmented_vector<model::tx_range>>
aborted_tx_ranges(model::offset, model::offset) override;
const ss::sstring& name() override { return _snapshot_mgr.name(); }

Expand Down
79 changes: 42 additions & 37 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,25 +238,25 @@ struct seq_entry_v1 {
struct tx_snapshot_v1 {
static constexpr uint8_t version = 1;

std::vector<model::producer_identity> fenced;
std::vector<rm_stm::tx_range> ongoing;
std::vector<rm_stm::prepare_marker> prepared;
std::vector<rm_stm::tx_range> aborted;
std::vector<rm_stm::abort_index> abort_indexes;
fragmented_vector<model::producer_identity> fenced;
fragmented_vector<rm_stm::tx_range> ongoing;
fragmented_vector<rm_stm::prepare_marker> prepared;
fragmented_vector<rm_stm::tx_range> aborted;
fragmented_vector<rm_stm::abort_index> abort_indexes;
model::offset offset;
std::vector<seq_entry_v1> seqs;
fragmented_vector<seq_entry_v1> seqs;
};

struct tx_snapshot_v2 {
static constexpr uint8_t version = 2;

std::vector<model::producer_identity> fenced;
std::vector<rm_stm::tx_range> ongoing;
std::vector<rm_stm::prepare_marker> prepared;
std::vector<rm_stm::tx_range> aborted;
std::vector<rm_stm::abort_index> abort_indexes;
fragmented_vector<model::producer_identity> fenced;
fragmented_vector<rm_stm::tx_range> ongoing;
fragmented_vector<rm_stm::prepare_marker> prepared;
fragmented_vector<rm_stm::tx_range> aborted;
fragmented_vector<rm_stm::abort_index> abort_indexes;
model::offset offset;
std::vector<rm_stm::seq_entry> seqs;
fragmented_vector<rm_stm::seq_entry> seqs;
};

rm_stm::rm_stm(
Expand Down Expand Up @@ -1883,8 +1883,8 @@ model::offset rm_stm::last_stable_offset() {
}

static void filter_intersecting(
std::vector<rm_stm::tx_range>& target,
const std::vector<rm_stm::tx_range>& source,
fragmented_vector<rm_stm::tx_range>& target,
const fragmented_vector<rm_stm::tx_range>& source,
model::offset from,
model::offset to) {
for (auto& range : source) {
Expand All @@ -1898,7 +1898,7 @@ static void filter_intersecting(
}
}

ss::future<std::vector<rm_stm::tx_range>>
ss::future<fragmented_vector<rm_stm::tx_range>>
rm_stm::aborted_transactions(model::offset from, model::offset to) {
return _state_lock.hold_read_lock().then(
[from, to, this](ss::basic_rwlock<>::holder unit) mutable {
Expand All @@ -1907,9 +1907,9 @@ rm_stm::aborted_transactions(model::offset from, model::offset to) {
});
}

ss::future<std::vector<rm_stm::tx_range>>
ss::future<fragmented_vector<rm_stm::tx_range>>
rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
std::vector<rm_stm::tx_range> result;
fragmented_vector<rm_stm::tx_range> result;
if (!_is_tx_enabled) {
co_return result;
}
Expand All @@ -1922,7 +1922,7 @@ rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
continue;
}
if (_log_state.last_abort_snapshot.match(idx)) {
auto opt = _log_state.last_abort_snapshot;
auto& opt = _log_state.last_abort_snapshot;
filter_intersecting(result, opt.aborted, from, to);
} else {
intersecting_idxes.push_back(idx);
Expand Down Expand Up @@ -2469,12 +2469,12 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
data.seqs = std::move(data_v2.seqs);

for (auto& entry : data_v2.prepared) {
data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{
data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{
.pid = entry.pid, .tx_seq = entry.tx_seq});
}

for (auto& entry : data_v2.seqs) {
data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{
data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{
.pid = entry.pid, .tx_seq = model::tx_seq(entry.seq)});
}
} else if (hdr.version == tx_snapshot_v1::version) {
Expand Down Expand Up @@ -2505,7 +2505,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
}

for (auto& entry : data_v1.prepared) {
data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{
data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{
.pid = entry.pid, .tx_seq = entry.tx_seq});
}
} else if (hdr.version == tx_snapshot_v0::version) {
Expand All @@ -2520,7 +2520,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
data.seqs.push_back(std::move(seq));
}
for (auto& entry : data_v0.prepared) {
data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{
data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{
.pid = entry.pid, .tx_seq = entry.tx_seq});
}
} else {
Expand All @@ -2538,10 +2538,11 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
for (auto& entry : data.prepared) {
_log_state.prepared.emplace(entry.pid, entry);
}
_log_state.aborted.insert(
_log_state.aborted.end(),
std::make_move_iterator(data.aborted.begin()),
std::make_move_iterator(data.aborted.end()));
for (auto it = std::make_move_iterator(data.aborted.begin());
it != std::make_move_iterator(data.aborted.end());
it++) {
_log_state.aborted.push_back(*it);
}
co_await ss::max_concurrent_for_each(
data.abort_indexes, 32, [this](const abort_index& idx) -> ss::future<> {
auto f_name = abort_idx_name(idx.first, idx.last);
Expand All @@ -2551,10 +2552,12 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
std::make_pair(idx.first, idx.last), snapshot_size);
});
});
_log_state.abort_indexes.insert(
_log_state.abort_indexes.end(),
std::make_move_iterator(data.abort_indexes.begin()),
std::make_move_iterator(data.abort_indexes.end()));

for (auto it = std::make_move_iterator(data.abort_indexes.begin());
it != std::make_move_iterator(data.abort_indexes.end());
it++) {
_log_state.abort_indexes.push_back(*it);
}
for (auto& entry : data.seqs) {
const auto pid = entry.pid;
auto it = _log_state.seq_table.find(pid);
Expand All @@ -2576,7 +2579,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
if (last.last > model::offset(0)) {
auto snapshot_opt = co_await load_abort_snapshot(last);
if (snapshot_opt) {
_log_state.last_abort_snapshot = snapshot_opt.value();
_log_state.last_abort_snapshot = std::move(snapshot_opt.value());
}
}

Expand Down Expand Up @@ -2677,12 +2680,12 @@ ss::future<> rm_stm::offload_aborted_txns() {
auto idx = abort_index{
.first = snapshot.first, .last = snapshot.last};
_log_state.abort_indexes.push_back(idx);
co_await save_abort_snapshot(snapshot);
co_await save_abort_snapshot(std::move(snapshot));
snapshot = abort_snapshot{
.first = model::offset::max(), .last = model::offset::min()};
}
}
_log_state.aborted = snapshot.aborted;
_log_state.aborted = std::move(snapshot.aborted);
}

// DO NOT coroutinize this method as it may cause issues on ARM:
Expand Down Expand Up @@ -2825,11 +2828,13 @@ uint64_t rm_stm::get_snapshot_size() const {
return persisted_stm::get_snapshot_size() + abort_snapshots_size;
}

ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {
auto filename = abort_idx_name(snapshot.first, snapshot.last);
ss::future<> rm_stm::save_abort_snapshot(abort_snapshot&& snapshot) {
auto first_offset = snapshot.first;
auto last_offset = snapshot.last;
auto filename = abort_idx_name(first_offset, last_offset);
vlog(_ctx_log.debug, "saving abort snapshot {} at {}", snapshot, filename);
iobuf snapshot_data;
reflection::adl<abort_snapshot>{}.to(snapshot_data, snapshot);
reflection::adl<abort_snapshot>{}.to(snapshot_data, std::move(snapshot));
int32_t snapshot_size = snapshot_data.size_bytes();

auto writer = co_await _abort_snapshot_mgr.start_snapshot(filename);
Expand All @@ -2844,7 +2849,7 @@ ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {
uint64_t snapshot_disk_size
= co_await _abort_snapshot_mgr.get_snapshot_size(filename);
_abort_snapshot_sizes.emplace(
std::make_pair(snapshot.first, snapshot.last), snapshot_disk_size);
std::make_pair(first_offset, last_offset), snapshot_disk_size);
}

ss::future<std::optional<rm_stm::abort_snapshot>>
Expand Down
34 changes: 18 additions & 16 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "storage/snapshot.h"
#include "utils/available_promise.h"
#include "utils/expiring_promise.h"
#include "utils/fragmented_vector.h"
#include "utils/mutex.h"
#include "utils/prefix_logger.h"
#include "utils/tracking_allocator.h"
Expand Down Expand Up @@ -129,13 +130,13 @@ class rm_stm final : public persisted_stm {
struct tx_snapshot {
static constexpr uint8_t version = 3;

std::vector<model::producer_identity> fenced;
std::vector<tx_range> ongoing;
std::vector<prepare_marker> prepared;
std::vector<tx_range> aborted;
std::vector<abort_index> abort_indexes;
fragmented_vector<model::producer_identity> fenced;
fragmented_vector<tx_range> ongoing;
fragmented_vector<prepare_marker> prepared;
fragmented_vector<tx_range> aborted;
fragmented_vector<abort_index> abort_indexes;
model::offset offset;
std::vector<seq_entry> seqs;
fragmented_vector<seq_entry> seqs;

struct tx_seqs_snapshot {
model::producer_identity pid;
Expand All @@ -147,14 +148,14 @@ class rm_stm final : public persisted_stm {
duration_type timeout;
};

std::vector<tx_seqs_snapshot> tx_seqs;
std::vector<expiration_snapshot> expiration;
fragmented_vector<tx_seqs_snapshot> tx_seqs;
fragmented_vector<expiration_snapshot> expiration;
};

struct abort_snapshot {
model::offset first;
model::offset last;
std::vector<tx_range> aborted;
fragmented_vector<tx_range> aborted;

bool match(abort_index idx) {
return idx.first == first && idx.last == last;
Expand Down Expand Up @@ -187,7 +188,7 @@ class rm_stm final : public persisted_stm {
model::producer_identity, model::tx_seq, model::timeout_clock::duration);

model::offset last_stable_offset();
ss::future<std::vector<rm_stm::tx_range>>
ss::future<fragmented_vector<rm_stm::tx_range>>
aborted_transactions(model::offset, model::offset);

model::offset max_collectible_offset() override {
Expand All @@ -198,7 +199,7 @@ class rm_stm final : public persisted_stm {
return storage::stm_type::transactional;
}

ss::future<std::vector<model::tx_range>>
ss::future<fragmented_vector<model::tx_range>>
aborted_tx_ranges(model::offset from, model::offset to) override {
return aborted_transactions(from, to);
}
Expand Down Expand Up @@ -297,7 +298,7 @@ class rm_stm final : public persisted_stm {
private:
void setup_metrics();
ss::future<> do_remove_persistent_state();
ss::future<std::vector<rm_stm::tx_range>>
ss::future<fragmented_vector<rm_stm::tx_range>>

do_aborted_transactions(model::offset, model::offset);
ss::future<checked<model::term_id, tx_errc>> do_begin_tx(
Expand All @@ -317,7 +318,7 @@ class rm_stm final : public persisted_stm {
ss::future<> apply_snapshot(stm_snapshot_header, iobuf&&) override;
ss::future<stm_snapshot> take_snapshot() override;
ss::future<std::optional<abort_snapshot>> load_abort_snapshot(abort_index);
ss::future<> save_abort_snapshot(abort_snapshot);
ss::future<> save_abort_snapshot(abort_snapshot&&);

bool check_seq(model::batch_identity, model::term_id);
std::optional<kafka::offset> known_seq(model::batch_identity) const;
Expand Down Expand Up @@ -472,8 +473,8 @@ class rm_stm final : public persisted_stm {
model::producer_identity,
prepare_marker>
prepared;
std::vector<tx_range> aborted;
std::vector<abort_index> abort_indexes;
fragmented_vector<tx_range> aborted;
fragmented_vector<abort_index> abort_indexes;
abort_snapshot last_abort_snapshot{.last = model::offset(-1)};
// the only piece of data which we update on replay and before
// replicating the command. we use the highest seq number to resolve
Expand Down Expand Up @@ -659,7 +660,8 @@ class rm_stm final : public persisted_stm {
int32_t last_seq{-1};
result<kafka_result> r = errc::success;
bool is_processing;
std::vector<ss::lw_shared_ptr<available_promise<result<kafka_result>>>>
fragmented_vector<
ss::lw_shared_ptr<available_promise<result<kafka_result>>>>
parked;
};

Expand Down
Loading

0 comments on commit d249ea0

Please sign in to comment.