Skip to content

Commit

Permalink
Merge pull request #14368 from andrwng/compaction-dedupe-utils
Browse files Browse the repository at this point in the history
storage: utilities to implement sliding window compaction
  • Loading branch information
dotnwat authored Oct 26, 2023
2 parents 6e684ab + e1d9ff3 commit 3eaf1ba
Show file tree
Hide file tree
Showing 16 changed files with 568 additions and 41 deletions.
1 change: 1 addition & 0 deletions src/v/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ v_cc_library(
NAME storage
SRCS
segment_reader.cc
segment_deduplication_utils.cc
log_manager.cc
disk_log_impl.cc
disk_log_appender.cc
Expand Down
23 changes: 23 additions & 0 deletions src/v/storage/compacted_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <seastar/core/circular_buffer.hh>
#include <seastar/core/file.hh>
#include <seastar/core/loop.hh>

#include <memory>
namespace storage {
Expand Down Expand Up @@ -68,6 +69,22 @@ class compacted_index_reader {
});
}

template<typename Func>
ss::future<>
for_each_async(Func f, model::timeout_clock::time_point timeout) {
while (true) {
while (likely(!is_slice_empty())) {
if (co_await f(pop_batch()) == ss::stop_iteration::yes) {
co_return;
}
}
if (is_end_of_stream()) {
co_return;
}
co_await do_load_slice(timeout);
}
}

private:
compacted_index::entry pop_batch() {
compacted_index::entry batch = std::move(_slice.front());
Expand Down Expand Up @@ -129,6 +146,12 @@ class compacted_index_reader {
return _impl->consume(std::move(consumer), timeout);
}

template<typename Func>
ss::future<>
for_each_async(Func f, model::timeout_clock::time_point timeout) {
return _impl->for_each_async(std::move(f), timeout);
}

friend std::ostream&
operator<<(std::ostream& o, const compacted_index_reader& r) {
r.print(o);
Expand Down
55 changes: 41 additions & 14 deletions src/v/storage/compaction_reducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,28 @@ compacted_offset_list_reducer::operator()(compacted_index::entry&& e) {
return ss::make_ready_future<stop_t>(stop_t::no);
}

std::optional<model::record_batch>
copy_data_segment_reducer::filter(model::record_batch&& batch) {
ss::future<> copy_data_segment_reducer::maybe_keep_offset(
const model::record_batch& batch,
const model::record& r,
std::vector<int32_t>& offset_deltas) {
if (co_await _should_keep_fn(batch, r)) {
offset_deltas.push_back(r.offset_delta());
co_return;
}
// Keep the last record to ensure the bounds of the segment remain.
auto o = batch.base_offset() + model::offset_delta(r.offset_delta());
if (o == _segment_last_offset) {
offset_deltas.push_back(r.offset_delta());
co_return;
}
}

ss::future<std::optional<model::record_batch>>
copy_data_segment_reducer::filter(model::record_batch batch) {
// do not compact raft configuration and archival metadata as they shift
// offset translation
if (!is_compactible(batch)) {
return std::move(batch);
co_return std::move(batch);
}

// 0. Reset the transactional bit, we need not carry it forward.
Expand Down Expand Up @@ -158,18 +174,16 @@ copy_data_segment_reducer::filter(model::record_batch&& batch) {
}

// 1. compute which records to keep
const auto base = batch.base_offset();
std::vector<int32_t> offset_deltas;
offset_deltas.reserve(batch.record_count());
batch.for_each_record([this, base, &offset_deltas](const model::record& r) {
if (should_keep(base, r.offset_delta())) {
offset_deltas.push_back(r.offset_delta());
}
});
co_await batch.for_each_record_async(
[this, &batch, &offset_deltas](const model::record& r) {
return maybe_keep_offset(batch, r, offset_deltas);
});

// 2. no record to keep
if (offset_deltas.empty()) {
return std::nullopt;
co_return std::nullopt;
}

// 3. keep all records
Expand All @@ -178,7 +192,7 @@ copy_data_segment_reducer::filter(model::record_batch&& batch) {
hdr.crc = model::crc_record_batch(batch);
hdr.header_crc = model::internal_header_only_crc(hdr);
}
return std::move(batch);
co_return std::move(batch);
}

// 4. filter
Expand Down Expand Up @@ -235,7 +249,7 @@ copy_data_segment_reducer::filter(model::record_batch&& batch) {
// above. These empty batches are retained only until either a new
// sequence number is written by the corresponding producer or the
// producerId is expired from lack of activity.
return std::nullopt;
co_return std::nullopt;
}

// There is no similar need to preserve the timestamp from the original
Expand Down Expand Up @@ -263,16 +277,29 @@ copy_data_segment_reducer::filter(model::record_batch&& batch) {
reset_size_checksum_metadata(new_hdr, ret);
auto new_batch = model::record_batch(
new_hdr, std::move(ret), model::record_batch::tag_ctor_ng{});
return new_batch;
co_return new_batch;
}

ss::future<ss::stop_iteration> copy_data_segment_reducer::do_compaction(
model::compression original, model::record_batch b) {
using stop_t = ss::stop_iteration;
auto to_copy = filter(std::move(b));
auto to_copy = co_await filter(std::move(b));
if (to_copy == std::nullopt) {
co_return stop_t::no;
}
if (_compacted_idx && is_compactible(to_copy.value())) {
co_await model::for_each_record(
to_copy.value(),
[&batch = to_copy.value(), this](const model::record& r) {
auto& hdr = batch.header();
return _compacted_idx->index(
hdr.type,
hdr.attrs.is_control(),
r.key(),
batch.base_offset(),
r.offset_delta());
});
}
auto batch = co_await compress_batch(original, std::move(to_copy.value()));
auto const start_offset = _appender->file_byte_offset();
auto const header_size = batch.header().size_bytes;
Expand Down
33 changes: 24 additions & 9 deletions src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "utils/fragmented_vector.h"
#include "utils/tracking_allocator.h"

#include <seastar/util/noncopyable_function.hh>

#include <absl/container/btree_map.h>
#include <fmt/core.h>
#include <roaring/roaring.hh>
Expand Down Expand Up @@ -115,13 +117,19 @@ class compacted_offset_list_reducer : public compaction_reducer {

class copy_data_segment_reducer : public compaction_reducer {
public:
using filter_t = ss::noncopyable_function<ss::future<bool>(
const model::record_batch&, const model::record&)>;
copy_data_segment_reducer(
compacted_offset_list l,
filter_t f,
segment_appender* a,
bool internal_topic,
offset_delta_time apply_offset)
: _list(std::move(l))
offset_delta_time apply_offset,
model::offset segment_last_offset = model::offset{},
compacted_index_writer* cidx = nullptr)
: _should_keep_fn(std::move(f))
, _segment_last_offset(segment_last_offset)
, _appender(a)
, _compacted_idx(cidx)
, _idx(index_state::make_empty_index(apply_offset))
, _internal_topic(internal_topic) {}

Expand All @@ -132,14 +140,21 @@ class copy_data_segment_reducer : public compaction_reducer {
ss::future<ss::stop_iteration>
do_compaction(model::compression, model::record_batch);

bool should_keep(model::offset base, int32_t delta) const {
const auto o = base + model::offset(delta);
return _list.contains(o);
}
std::optional<model::record_batch> filter(model::record_batch&&);
ss::future<> maybe_keep_offset(
const model::record_batch&, const model::record&, std::vector<int32_t>&);

compacted_offset_list _list;
ss::future<std::optional<model::record_batch>> filter(model::record_batch);

filter_t _should_keep_fn;

// Offset to keep in case the index is empty as of getting to this offset.
model::offset _segment_last_offset;
segment_appender* _appender;

// Compacted index writer for the newly written segment. May not be
// supplied if the compacted index isn't expected to change, e.g. when
// rewriting a single segment filtering with its own compacted index.
compacted_index_writer* _compacted_idx;
index_state _idx;
size_t _acc{0};

Expand Down
50 changes: 50 additions & 0 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "model/timeout_clock.h"
#include "model/timestamp.h"
#include "reflection/adl.h"
#include "storage/compacted_offset_list.h"
#include "storage/compaction_reducers.h"
#include "storage/disk_log_appender.h"
#include "storage/fwd.h"
#include "storage/kvstore.h"
Expand Down Expand Up @@ -43,6 +45,7 @@
#include <seastar/core/shared_ptr.hh>

#include <fmt/format.h>
#include <roaring/roaring.hh>

#include <exception>
#include <iterator>
Expand Down Expand Up @@ -459,6 +462,53 @@ ss::future<> disk_log_impl::do_compact(
}
}

segment_set disk_log_impl::find_sliding_range(
const compaction_config& cfg, std::optional<model::offset> new_start_offset) {
// Collect all segments that have stable data.
segment_set::underlying_t buf;
for (const auto& seg : _segs) {
if (
new_start_offset
&& seg->offsets().base_offset < new_start_offset.value()) {
// Skip over segments that are being truncated.
continue;
}
if (seg->has_appender() || !seg->has_compactible_offsets(cfg)) {
// Stop once we get to an unstable segment.
break;
}
buf.emplace_back(seg);
}
segment_set segs(std::move(buf));
if (segs.empty()) {
return segs;
}

// If a previous sliding window compaction ran, and there are no new
// segments, segments at the start of that window and above have been
// fully deduplicated and don't need to be compacted further.
//
// If there are new segments that have not been compacted, we can't make
// this claim, and compact everything again.
if (
segs.back()->finished_windowed_compaction()
&& _last_compaction_window_start_offset.has_value()) {
while (!segs.empty()) {
if (
segs.back()->offsets().base_offset
>= _last_compaction_window_start_offset.value()) {
// A previous compaction deduplicated the keys above this
// offset. As such, segments above this point would not benefit
// from being included in the compaction window.
segs.pop_back();
} else {
break;
}
}
}
return segs;
}

std::optional<std::pair<segment_set::iterator, segment_set::iterator>>
disk_log_impl::find_compaction_range(const compaction_config& cfg) {
/*
Expand Down
18 changes: 16 additions & 2 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ class disk_log_impl final : public log {

std::optional<model::offset> retention_offset(gc_config) const final;

// Collects an iterable list of segments over which to perform sliding
// window compaction.
segment_set find_sliding_range(
const compaction_config& cfg,
std::optional<model::offset> new_start_offset = std::nullopt);

void set_last_compaction_window_start_offset(model::offset o) {
_last_compaction_window_start_offset = o;
}

readers_cache& readers() { return *_readers_cache; }

storage_resources& resources();

private:
friend class disk_log_appender; // for multi-term appends
friend class disk_log_builder; // for tests
Expand All @@ -168,6 +182,7 @@ class disk_log_impl final : public log {
storage::compaction_config cfg);
std::optional<std::pair<segment_set::iterator, segment_set::iterator>>
find_compaction_range(const compaction_config&);

ss::future<std::optional<model::offset>> do_gc(gc_config);

ss::future<> remove_empty_segments();
Expand Down Expand Up @@ -210,8 +225,6 @@ class disk_log_impl final : public log {
gc_config apply_overrides(gc_config) const;
gc_config apply_base_overrides(gc_config) const;

storage_resources& resources();

void wrote_stm_bytes(size_t);

// returns true if this partition's local retention configuration has
Expand Down Expand Up @@ -291,6 +304,7 @@ class disk_log_impl final : public log {
mutex _segments_rolling_lock;

std::optional<model::offset> _cloud_gc_offset;
std::optional<model::offset> _last_compaction_window_start_offset;
size_t _reclaimable_local_size_bytes{0};
};

Expand Down
14 changes: 9 additions & 5 deletions src/v/storage/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@
namespace storage {

class api;
class node_api;
class compacted_index_writer;
class compaction_controller;
class key_offset_map;
class kvstore;
class node_api;
class ntp_config;
class log;
class log_manager;
class ntp_config;
class probe;
class offset_translator_state;
class readers_cache;
class segment;
class segment_appender;
class simple_snapshot_manager;
class snapshot_manager;
class storage_resources;
class readers_cache;
class compaction_controller;
class offset_translator_state;
struct compaction_config;
struct log_reader_config;
struct timequery_config;

Expand Down
4 changes: 2 additions & 2 deletions src/v/storage/key_offset_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

namespace storage {

simple_key_offset_map::simple_key_offset_map(size_t max_keys)
simple_key_offset_map::simple_key_offset_map(std::optional<size_t> max_keys)
: _memory_tracker(ss::make_shared<util::mem_tracker>("simple_key_offset_map"))
, _map(util::mem_tracked::map<absl::btree_map, compaction_key, model::offset>(
_memory_tracker))
, _max_keys(max_keys) {}
, _max_keys(max_keys ? *max_keys : default_key_limit) {}

seastar::future<bool>
simple_key_offset_map::put(const compaction_key& key, model::offset o) {
Expand Down
3 changes: 2 additions & 1 deletion src/v/storage/key_offset_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class simple_key_offset_map final : public key_offset_map {
* Construct a new simple_key_offset_map with \p max_key maximum number of
* keys.
*/
explicit simple_key_offset_map(size_t max_keys = default_key_limit);
explicit simple_key_offset_map(
std::optional<size_t> max_keys = std::nullopt);

seastar::future<bool>
put(const compaction_key& key, model::offset offset) override;
Expand Down
Loading

0 comments on commit 3eaf1ba

Please sign in to comment.