diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 54ee34595f19..652012c4a2ca 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -907,7 +907,7 @@ ss::future ntp_archiver::upload_tx( auto path = segment_path_for_candidate(candidate); - cloud_storage::tx_range_manifest manifest(path, tx_range); + cloud_storage::tx_range_manifest manifest(path, std::move(tx_range)); co_return co_await _remote.upload_manifest( get_bucket_name(), manifest, fib, _tx_tags); diff --git a/src/v/cloud_storage/tests/tx_range_manifest_test.cc b/src/v/cloud_storage/tests/tx_range_manifest_test.cc index 7b698984427e..e7d3fb7e5dea 100644 --- a/src/v/cloud_storage/tests/tx_range_manifest_test.cc +++ b/src/v/cloud_storage/tests/tx_range_manifest_test.cc @@ -78,7 +78,9 @@ SEASTAR_THREAD_TEST_CASE(empty_serialization_roundtrip_test) { } SEASTAR_THREAD_TEST_CASE(serialization_roundtrip_test) { - tx_range_manifest m(segment_path, ranges); + fragmented_vector tx_ranges; + tx_ranges = ranges; + tx_range_manifest m(segment_path, std::move(tx_ranges)); auto [is, size] = m.serialize().get(); iobuf buf; auto os = make_iobuf_ref_output_stream(buf); diff --git a/src/v/cloud_storage/tx_range_manifest.cc b/src/v/cloud_storage/tx_range_manifest.cc index 068c982e6709..4dc7591b48c4 100644 --- a/src/v/cloud_storage/tx_range_manifest.cc +++ b/src/v/cloud_storage/tx_range_manifest.cc @@ -32,13 +32,9 @@ remote_manifest_path generate_remote_tx_path(const remote_segment_path& path) { } tx_range_manifest::tx_range_manifest( - remote_segment_path spath, const std::vector& range) - : _path(std::move(spath)) { - for (const auto& tx : range) { - _ranges.push_back(tx); - } - _ranges.shrink_to_fit(); -} + remote_segment_path spath, fragmented_vector&& range) + : _path(std::move(spath)) + , _ranges(std::move(range)) {} tx_range_manifest::tx_range_manifest(remote_segment_path spath) : _path(std::move(spath)) {} diff --git a/src/v/cloud_storage/tx_range_manifest.h b/src/v/cloud_storage/tx_range_manifest.h index 8ef96e05da2a..1ed2c9dc2ccc 100644 --- a/src/v/cloud_storage/tx_range_manifest.h +++ b/src/v/cloud_storage/tx_range_manifest.h @@ -29,7 +29,7 @@ class tx_range_manifest final : public base_manifest { public: /// Create manifest for specific ntp explicit tx_range_manifest( - remote_segment_path spath, const std::vector& range); + remote_segment_path spath, fragmented_vector&& range); /// Create empty manifest that supposed to be updated later explicit tx_range_manifest(remote_segment_path spath); diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 5c287eeeef2e..e5265f14558c 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -33,6 +33,7 @@ #include "raft/types.h" #include "storage/translating_reader.h" #include "storage/types.h" +#include "utils/fragmented_vector.h" #include @@ -211,11 +212,11 @@ class partition { ss::shared_ptr tm_stm() { return _tm_stm; } - ss::future> + ss::future> aborted_transactions(model::offset from, model::offset to) { if (!_rm_stm) { - return ss::make_ready_future>( - std::vector()); + return ss::make_ready_future>( + fragmented_vector()); } return _rm_stm->aborted_transactions(from, to); } diff --git a/src/v/cluster/persisted_stm.cc b/src/v/cluster/persisted_stm.cc index 0fa189448b76..1ceb1878417c 100644 --- a/src/v/cluster/persisted_stm.cc +++ b/src/v/cluster/persisted_stm.cc @@ -185,9 +185,9 @@ model::offset persisted_stm::max_collectible_offset() { return model::offset::max(); } -ss::future> +ss::future> persisted_stm::aborted_tx_ranges(model::offset, model::offset) { - return ss::make_ready_future>(); + return ss::make_ready_future>(); } ss::future<> persisted_stm::wait_offset_committed( diff --git a/src/v/cluster/persisted_stm.h b/src/v/cluster/persisted_stm.h index 7895be609040..2c9853b40145 100644 --- a/src/v/cluster/persisted_stm.h +++ b/src/v/cluster/persisted_stm.h @@ -22,6 +22,7 @@ #include "storage/snapshot.h" #include "storage/types.h" #include "utils/expiring_promise.h" +#include "utils/fragmented_vector.h" #include "utils/mutex.h" #include @@ -88,7 +89,7 @@ class persisted_stm void make_snapshot_in_background() final; ss::future<> ensure_snapshot_exists(model::offset) final; model::offset max_collectible_offset() override; - ss::future> + ss::future> aborted_tx_ranges(model::offset, model::offset) override; const ss::sstring& name() override { return _snapshot_mgr.name(); } diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 63f785837fa9..7e0a8275e314 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -213,13 +213,13 @@ struct seq_entry_v0 { struct tx_snapshot_v0 { static constexpr uint8_t version = 0; - std::vector fenced; - std::vector ongoing; - std::vector prepared; - std::vector aborted; - std::vector abort_indexes; + fragmented_vector fenced; + fragmented_vector ongoing; + fragmented_vector prepared; + fragmented_vector aborted; + fragmented_vector abort_indexes; model::offset offset; - std::vector seqs; + fragmented_vector seqs; }; struct seq_cache_entry_v1 { @@ -238,25 +238,25 @@ struct seq_entry_v1 { struct tx_snapshot_v1 { static constexpr uint8_t version = 1; - std::vector fenced; - std::vector ongoing; - std::vector prepared; - std::vector aborted; - std::vector abort_indexes; + fragmented_vector fenced; + fragmented_vector ongoing; + fragmented_vector prepared; + fragmented_vector aborted; + fragmented_vector abort_indexes; model::offset offset; - std::vector seqs; + fragmented_vector seqs; }; struct tx_snapshot_v2 { static constexpr uint8_t version = 2; - std::vector fenced; - std::vector ongoing; - std::vector prepared; - std::vector aborted; - std::vector abort_indexes; + fragmented_vector fenced; + fragmented_vector ongoing; + fragmented_vector prepared; + fragmented_vector aborted; + fragmented_vector abort_indexes; model::offset offset; - std::vector seqs; + fragmented_vector seqs; }; rm_stm::rm_stm( @@ -1883,8 +1883,8 @@ model::offset rm_stm::last_stable_offset() { } static void filter_intersecting( - std::vector& target, - const std::vector& source, + fragmented_vector& target, + const fragmented_vector& source, model::offset from, model::offset to) { for (auto& range : source) { @@ -1898,7 +1898,7 @@ static void filter_intersecting( } } -ss::future> +ss::future> rm_stm::aborted_transactions(model::offset from, model::offset to) { return _state_lock.hold_read_lock().then( [from, to, this](ss::basic_rwlock<>::holder unit) mutable { @@ -1907,13 +1907,13 @@ rm_stm::aborted_transactions(model::offset from, model::offset to) { }); } -ss::future> +ss::future> rm_stm::do_aborted_transactions(model::offset from, model::offset to) { - std::vector result; + fragmented_vector result; if (!_is_tx_enabled) { co_return result; } - std::vector intersecting_idxes; + fragmented_vector intersecting_idxes; for (const auto& idx : _log_state.abort_indexes) { if (idx.last < from) { continue; @@ -1922,7 +1922,7 @@ rm_stm::do_aborted_transactions(model::offset from, model::offset to) { continue; } if (_log_state.last_abort_snapshot.match(idx)) { - auto opt = _log_state.last_abort_snapshot; + auto& opt = _log_state.last_abort_snapshot; filter_intersecting(result, opt.aborted, from, to); } else { intersecting_idxes.push_back(idx); @@ -1955,12 +1955,9 @@ void rm_stm::compact_snapshot() { return; } - std::vector lw_tss; - lw_tss.reserve(_log_state.seq_table.size()); - for (auto it = _log_state.seq_table.cbegin(); - it != _log_state.seq_table.cend(); - it++) { - lw_tss.push_back(it->second.entry.last_write_timestamp); + fragmented_vector lw_tss; + for (const auto& it : _log_state.seq_table) { + lw_tss.push_back(it.second.entry.last_write_timestamp); } std::sort(lw_tss.begin(), lw_tss.end()); auto pivot = lw_tss[lw_tss.size() - 1 - _seq_table_min_size]; @@ -2034,7 +2031,7 @@ ss::future<> rm_stm::do_abort_old_txes() { co_return; } - std::vector pids; + fragmented_vector pids; for (auto& [k, _] : _mem_state.estimated) { pids.push_back(k); } @@ -2469,12 +2466,13 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { data.seqs = std::move(data_v2.seqs); for (auto& entry : data_v2.prepared) { - data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{ + data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{ .pid = entry.pid, .tx_seq = entry.tx_seq}); } + // TODO: https://github.com/redpanda-data/redpanda/issues/9508 for (auto& entry : data_v2.seqs) { - data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{ + data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{ .pid = entry.pid, .tx_seq = model::tx_seq(entry.seq)}); } } else if (hdr.version == tx_snapshot_v1::version) { @@ -2505,7 +2503,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { } for (auto& entry : data_v1.prepared) { - data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{ + data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{ .pid = entry.pid, .tx_seq = entry.tx_seq}); } } else if (hdr.version == tx_snapshot_v0::version) { @@ -2520,7 +2518,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { data.seqs.push_back(std::move(seq)); } for (auto& entry : data_v0.prepared) { - data.tx_seqs.emplace_back(tx_snapshot::tx_seqs_snapshot{ + data.tx_seqs.push_back(tx_snapshot::tx_seqs_snapshot{ .pid = entry.pid, .tx_seq = entry.tx_seq}); } } else { @@ -2538,10 +2536,11 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { for (auto& entry : data.prepared) { _log_state.prepared.emplace(entry.pid, entry); } - _log_state.aborted.insert( - _log_state.aborted.end(), - std::make_move_iterator(data.aborted.begin()), - std::make_move_iterator(data.aborted.end())); + for (auto it = std::make_move_iterator(data.aborted.begin()); + it != std::make_move_iterator(data.aborted.end()); + it++) { + _log_state.aborted.push_back(*it); + } co_await ss::max_concurrent_for_each( data.abort_indexes, 32, [this](const abort_index& idx) -> ss::future<> { auto f_name = abort_idx_name(idx.first, idx.last); @@ -2551,10 +2550,12 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { std::make_pair(idx.first, idx.last), snapshot_size); }); }); - _log_state.abort_indexes.insert( - _log_state.abort_indexes.end(), - std::make_move_iterator(data.abort_indexes.begin()), - std::make_move_iterator(data.abort_indexes.end())); + + for (auto it = std::make_move_iterator(data.abort_indexes.begin()); + it != std::make_move_iterator(data.abort_indexes.end()); + it++) { + _log_state.abort_indexes.push_back(*it); + } for (auto& entry : data.seqs) { const auto pid = entry.pid; auto it = _log_state.seq_table.find(pid); @@ -2576,7 +2577,7 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { if (last.last > model::offset(0)) { auto snapshot_opt = co_await load_abort_snapshot(last); if (snapshot_opt) { - _log_state.last_abort_snapshot = snapshot_opt.value(); + _log_state.last_abort_snapshot = std::move(snapshot_opt.value()); } } @@ -2596,13 +2597,13 @@ rm_stm::apply_snapshot(stm_snapshot_header hdr, iobuf&& tx_ss_buf) { // We need to fill order for idempotent requests. So pid is from idempotent // request if it is not inside fence_pid_epoch. For order we just need to // check last_write_timestamp. It contains time last apply for log record. - std::vector sorted_pids; + fragmented_vector sorted_pids; for (auto it = _log_state.seq_table.begin(); it != _log_state.seq_table.end(); ++it) { if (!_log_state.fence_pid_epoch.contains( it->second.entry.pid.get_id())) { - sorted_pids.emplace_back(it); + sorted_pids.push_back(it); } } @@ -2677,12 +2678,12 @@ ss::future<> rm_stm::offload_aborted_txns() { auto idx = abort_index{ .first = snapshot.first, .last = snapshot.last}; _log_state.abort_indexes.push_back(idx); - co_await save_abort_snapshot(snapshot); + co_await save_abort_snapshot(std::move(snapshot)); snapshot = abort_snapshot{ .first = model::offset::max(), .last = model::offset::min()}; } } - _log_state.aborted = snapshot.aborted; + _log_state.aborted = std::move(snapshot.aborted); } // DO NOT coroutinize this method as it may cause issues on ARM: @@ -2690,9 +2691,8 @@ ss::future<> rm_stm::offload_aborted_txns() { ss::future rm_stm::take_snapshot() { auto start_offset = _raft->start_offset(); - std::vector abort_indexes; - std::vector expired_abort_indexes; - abort_indexes.reserve(_log_state.abort_indexes.size()); + fragmented_vector abort_indexes; + fragmented_vector expired_abort_indexes; for (const auto& idx : _log_state.abort_indexes) { if (idx.last < start_offset) { @@ -2712,7 +2712,8 @@ ss::future rm_stm::take_snapshot() { expired_abort_indexes.size(), start_offset); auto f = ss::do_with( - std::move(expired_abort_indexes), [this](std::vector& idxs) { + std::move(expired_abort_indexes), + [this](fragmented_vector& idxs) { return ss::parallel_for_each( idxs.begin(), idxs.end(), [this](const abort_index& idx) { auto f_name = abort_idx_name(idx.first, idx.last); @@ -2726,8 +2727,7 @@ ss::future rm_stm::take_snapshot() { }); }); - std::vector aborted; - aborted.reserve(_log_state.aborted.size()); + fragmented_vector aborted; std::copy_if( _log_state.aborted.begin(), _log_state.aborted.end(), @@ -2825,11 +2825,13 @@ uint64_t rm_stm::get_snapshot_size() const { return persisted_stm::get_snapshot_size() + abort_snapshots_size; } -ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) { - auto filename = abort_idx_name(snapshot.first, snapshot.last); +ss::future<> rm_stm::save_abort_snapshot(abort_snapshot&& snapshot) { + auto first_offset = snapshot.first; + auto last_offset = snapshot.last; + auto filename = abort_idx_name(first_offset, last_offset); vlog(_ctx_log.debug, "saving abort snapshot {} at {}", snapshot, filename); iobuf snapshot_data; - reflection::adl{}.to(snapshot_data, snapshot); + reflection::adl{}.to(snapshot_data, std::move(snapshot)); int32_t snapshot_size = snapshot_data.size_bytes(); auto writer = co_await _abort_snapshot_mgr.start_snapshot(filename); @@ -2844,7 +2846,7 @@ ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) { uint64_t snapshot_disk_size = co_await _abort_snapshot_mgr.get_snapshot_size(filename); _abort_snapshot_sizes.emplace( - std::make_pair(snapshot.first, snapshot.last), snapshot_disk_size); + std::make_pair(first_offset, last_offset), snapshot_disk_size); } ss::future> @@ -2968,7 +2970,7 @@ ss::future<> rm_stm::clear_old_tx_pids() { co_return; } - std::vector pids_for_delete; + fragmented_vector pids_for_delete; for (auto [id, epoch] : _log_state.fence_pid_epoch) { auto pid = model::producer_identity(id, epoch); // If pid is not inside tx_seqs it means we do not have transaction for diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index f6214b00e254..ca02952fc963 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -26,6 +26,7 @@ #include "storage/snapshot.h" #include "utils/available_promise.h" #include "utils/expiring_promise.h" +#include "utils/fragmented_vector.h" #include "utils/mutex.h" #include "utils/prefix_logger.h" #include "utils/tracking_allocator.h" @@ -129,13 +130,13 @@ class rm_stm final : public persisted_stm { struct tx_snapshot { static constexpr uint8_t version = 3; - std::vector fenced; - std::vector ongoing; - std::vector prepared; - std::vector aborted; - std::vector abort_indexes; + fragmented_vector fenced; + fragmented_vector ongoing; + fragmented_vector prepared; + fragmented_vector aborted; + fragmented_vector abort_indexes; model::offset offset; - std::vector seqs; + fragmented_vector seqs; struct tx_seqs_snapshot { model::producer_identity pid; @@ -147,14 +148,14 @@ class rm_stm final : public persisted_stm { duration_type timeout; }; - std::vector tx_seqs; - std::vector expiration; + fragmented_vector tx_seqs; + fragmented_vector expiration; }; struct abort_snapshot { model::offset first; model::offset last; - std::vector aborted; + fragmented_vector aborted; bool match(abort_index idx) { return idx.first == first && idx.last == last; @@ -187,7 +188,7 @@ class rm_stm final : public persisted_stm { model::producer_identity, model::tx_seq, model::timeout_clock::duration); model::offset last_stable_offset(); - ss::future> + ss::future> aborted_transactions(model::offset, model::offset); model::offset max_collectible_offset() override { @@ -198,7 +199,7 @@ class rm_stm final : public persisted_stm { return storage::stm_type::transactional; } - ss::future> + ss::future> aborted_tx_ranges(model::offset from, model::offset to) override { return aborted_transactions(from, to); } @@ -297,7 +298,7 @@ class rm_stm final : public persisted_stm { private: void setup_metrics(); ss::future<> do_remove_persistent_state(); - ss::future> + ss::future> do_aborted_transactions(model::offset, model::offset); ss::future> do_begin_tx( @@ -317,7 +318,7 @@ class rm_stm final : public persisted_stm { ss::future<> apply_snapshot(stm_snapshot_header, iobuf&&) override; ss::future take_snapshot() override; ss::future> load_abort_snapshot(abort_index); - ss::future<> save_abort_snapshot(abort_snapshot); + ss::future<> save_abort_snapshot(abort_snapshot&&); bool check_seq(model::batch_identity, model::term_id); std::optional known_seq(model::batch_identity) const; @@ -472,8 +473,8 @@ class rm_stm final : public persisted_stm { model::producer_identity, prepare_marker> prepared; - std::vector aborted; - std::vector abort_indexes; + fragmented_vector aborted; + fragmented_vector abort_indexes; abort_snapshot last_abort_snapshot{.last = model::offset(-1)}; // the only piece of data which we update on replay and before // replicating the command. we use the highest seq number to resolve @@ -659,7 +660,8 @@ class rm_stm final : public persisted_stm { int32_t last_seq{-1}; result r = errc::success; bool is_processing; - std::vector>>> + fragmented_vector< + ss::lw_shared_ptr>>> parked; }; diff --git a/src/v/storage/compaction_reducers.h b/src/v/storage/compaction_reducers.h index b44595331df6..cf019729f65e 100644 --- a/src/v/storage/compaction_reducers.h +++ b/src/v/storage/compaction_reducers.h @@ -21,6 +21,7 @@ #include "storage/logger.h" #include "storage/segment_appender.h" #include "units.h" +#include "utils/fragmented_vector.h" #include #include @@ -187,7 +188,7 @@ class tx_reducer : public compaction_reducer { public: explicit tx_reducer( ss::lw_shared_ptr stm_mgr, - std::vector&& txs, + fragmented_vector&& txs, compacted_index_writer* w) noexcept : _delegate(index_rebuilder_reducer(w)) , _aborted_txs(model::tx_range_cmp(), std::move(txs)) @@ -234,7 +235,7 @@ class tx_reducer : public compaction_reducer { // A min heap of aborted transactions based on begin offset. using underlying_t = std::priority_queue< model::tx_range, - std::vector, + fragmented_vector, model::tx_range_cmp>; underlying_t _aborted_txs; // Current list of aborted transactions maintained up to the diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 703412ddbf03..a2f4a73ab9c9 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -507,7 +507,7 @@ ss::future> do_self_compact_segment( ss::future<> rebuild_compaction_index( model::record_batch_reader rdr, ss::lw_shared_ptr stm_manager, - std::vector&& aborted_txs, + fragmented_vector&& aborted_txs, segment_full_path p, compaction_config cfg, storage_resources& resources) { diff --git a/src/v/storage/types.h b/src/v/storage/types.h index 1146f28bfbd2..4f4b97fa96a1 100644 --- a/src/v/storage/types.h +++ b/src/v/storage/types.h @@ -71,7 +71,7 @@ class snapshotable_stm { // Only valid for state machines maintaining transactional state. // Returns aborted transactions in range [from, to] offsets. - virtual ss::future> + virtual ss::future> aborted_tx_ranges(model::offset, model::offset) = 0; virtual model::control_record_type @@ -133,9 +133,9 @@ class stm_manager { model::offset max_collectible_offset(); - ss::future> + ss::future> aborted_tx_ranges(model::offset to, model::offset from) { - std::vector r; + fragmented_vector r; if (_tx_stm) { r = co_await _tx_stm->aborted_tx_ranges(to, from); }