Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: GC segments contained by the pack #6010

Merged
merged 23 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8e6f884
cmake: Build rust deps using debug profile when build-type is not rel…
breezewish Sep 13, 2022
9d5b94e
Respect SAN_DEBUG
breezewish Sep 13, 2022
b65b198
Fix linking debug libraries
breezewish Sep 13, 2022
6e9785f
Merge branch 'master' into wenxuan/cmake-1
ti-chi-bot Sep 16, 2022
1857846
Merge branch 'master' into wenxuan/cmake-1
ti-chi-bot Sep 16, 2022
8168196
Merge branch 'master' into wenxuan/cmake-1
ti-chi-bot Sep 16, 2022
31cc955
Merge branch 'master' into wenxuan/cmake-1
breezewish Sep 17, 2022
18296ce
Disable overflow checks for libsymbolization
breezewish Sep 18, 2022
5093e3a
cmake: Add find-rust
breezewish Sep 19, 2022
2c8c5fe
Refactor README
breezewish Sep 19, 2022
d3148d8
Give some default directory names by default
breezewish Sep 19, 2022
2dcd3e5
Merge branch 'master' into wenxuan/cmake-find-rust
breezewish Sep 19, 2022
38d59f1
Seems that CI is using another env file. Let's try to make it simple.
breezewish Sep 19, 2022
5b8c686
Introduce a new framework for DMStore similar to Segment test
breezewish Sep 21, 2022
cd863a5
Reformat
breezewish Sep 21, 2022
62dd777
Merge remote-tracking branch 'origin/master' into wenxuan/dm_test
breezewish Sep 21, 2022
0dd1712
storage: merge segments that is contained by one pack
breezewish Sep 22, 2022
3e6000d
Just one more test case
breezewish Sep 22, 2022
07bf54f
Merge branch 'master' into wenxuan/dm_test
flowbehappy Sep 22, 2022
b09f91b
simplify
breezewish Sep 22, 2022
a0e2df9
Merge commit '07bf54fe47811872cf32b22e176e4d37aae6d4ff' into wenxuan/gc
breezewish Sep 22, 2022
c6ae68f
Add comments
breezewish Sep 22, 2022
ca9529d
Merge remote-tracking branch 'origin/master' into wenxuan/gc
breezewish Sep 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
M(gc_skip_update_safe_point) \
M(gc_skip_merge_delta) \
M(gc_skip_merge) \
M(force_set_page_file_write_errno) \
M(force_split_io_size_4k) \
M(minimum_block_size_for_cross_join) \
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1546,17 +1546,17 @@ DeltaMergeStoreStat DeltaMergeStore::getStat()
total_delta_valid_cache_rows += delta->getValidCacheRows();
}

if (stable->getPacks())
if (stable->getDMFilesPacks())
{
stat.total_rows += stable->getRows();
stat.total_size += stable->getBytes();

stat.stable_count += 1;
stat.total_pack_count_in_stable += stable->getPacks();
stat.total_pack_count_in_stable += stable->getDMFilesPacks();

stat.total_stable_rows += stable->getRows();
stat.total_stable_size += stable->getBytes();
stat.total_stable_size_on_disk += stable->getBytesOnDisk();
stat.total_stable_size_on_disk += stable->getDMFilesBytesOnDisk();
}
}

Expand Down Expand Up @@ -1679,10 +1679,10 @@ SegmentStats DeltaMergeStore::getSegmentStats()
stat.size = delta->getBytes() + stable->getBytes();
stat.delete_ranges = delta->getDeletes();

stat.stable_size_on_disk = stable->getBytesOnDisk();
stat.stable_size_on_disk = stable->getDMFilesBytesOnDisk();

stat.delta_pack_count = delta->getColumnFileCount();
stat.stable_pack_count = stable->getPacks();
stat.stable_pack_count = stable->getDMFilesPacks();

stat.avg_delta_pack_rows = static_cast<Float64>(delta->getRows()) / stat.delta_pack_count;
stat.avg_stable_pack_rows = static_cast<Float64>(stable->getRows()) / stat.stable_pack_count;
Expand Down
82 changes: 53 additions & 29 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ namespace DB
namespace FailPoints
{
extern const char gc_skip_update_safe_point[];
extern const char gc_skip_merge_delta[];
extern const char gc_skip_merge[];
extern const char pause_before_dt_background_delta_merge[];
extern const char pause_until_dt_background_delta_merge[];
} // namespace FailPoints
Expand Down Expand Up @@ -396,12 +398,22 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte
double invalid_data_ratio_threshold,
const LoggerPtr & log)
{
auto [first_pack_included, last_pack_included] = snap->stable->isFirstAndLastPackIncludedInRange(context, seg->getRowKeyRange());
// Do a quick check about whether the DTFile is completely included in the segment range
if (first_pack_included && last_pack_included)
if (snap->stable->getDMFilesPacks() == 0)
{
LOG_FMT_TRACE(log, "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange marking "
"segment as valid data ratio checked because all packs are included, segment={}",
LOG_FMT_TRACE(
log,
"GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange skipped segment "
"because the DTFile of stable is empty, segment={}",
seg->info());
return false;
}

auto at_least_result = snap->stable->getAtLeastRowsAndBytes(context, seg->getRowKeyRange());
if (at_least_result.first_pack_intersection == RSResult::All //
&& at_least_result.last_pack_intersection == RSResult::All)
{
LOG_FMT_TRACE(log, "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange permanently skipped segment "
"because all packs in DTFiles are fully contained by the segment range, segment={}",
seg->info());
seg->setValidDataRatioChecked();
return false;
Expand All @@ -410,34 +422,35 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte
std::unordered_set<UInt64> prev_segment_file_ids = getDMFileIDs(prev_seg);
std::unordered_set<UInt64> next_segment_file_ids = getDMFileIDs(next_seg);

// Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments.
bool contains_invalid_data = false;
const auto & dt_files = snap->stable->getDMFiles();
if (!first_pack_included)
if (at_least_result.first_pack_intersection != RSResult::All)
{
auto first_file_id = dt_files[0]->fileId();
if (prev_segment_file_ids.count(first_file_id) == 0)
auto first_file_id = dt_files.front()->fileId();
if (prev_seg != nullptr && prev_segment_file_ids.count(first_file_id) == 0)
{
contains_invalid_data = true;
}
}
if (!last_pack_included)
if (at_least_result.last_pack_intersection != RSResult::All)
{
auto last_file_id = dt_files[dt_files.size() - 1]->fileId();
if (next_segment_file_ids.count(last_file_id) == 0)
auto last_file_id = dt_files.back()->fileId();
if (next_seg != nullptr && next_segment_file_ids.count(last_file_id) == 0)
{
contains_invalid_data = true;
}
}
// Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments.
if (!contains_invalid_data)
{
LOG_FMT_TRACE(
log,
"GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange checked false because no invalid data, "
"segment={} first_pack_included={} last_pack_included={} prev_seg_files=[{}] next_seg_files=[{}] my_files=[{}]",
seg->simpleInfo(),
first_pack_included,
last_pack_included,
"GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange checked false "
"because segment DTFile is shared with a neighbor segment, "
"segment={} first_pack_inc={} last_pack_inc={} prev_seg_files=[{}] next_seg_files=[{}] my_files=[{}]",
seg->info(),
magic_enum::enum_name(at_least_result.first_pack_intersection),
magic_enum::enum_name(at_least_result.last_pack_intersection),
fmt::join(prev_segment_file_ids, ","),
fmt::join(next_segment_file_ids, ","),
[&] {
Expand All @@ -451,29 +464,32 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte
",");
return fmt_buf.toString();
}());
// We do not mark `setValidDataRatioChecked` because neighbor segments' state could change.
return false;
}

size_t total_rows = 0;
size_t total_bytes = 0;
size_t dt_file_rows = 0;
size_t dt_file_bytes = 0;
for (const auto & file : dt_files)
{
total_rows += file->getRows();
total_bytes += file->getBytes();
dt_file_rows += file->getRows();
dt_file_bytes += file->getBytes();
}
auto valid_rows = snap->stable->getRows();
auto valid_bytes = snap->stable->getBytes();

auto check_result = (valid_rows < total_rows * (1 - invalid_data_ratio_threshold)) || (valid_bytes < total_bytes * (1 - invalid_data_ratio_threshold));
auto check_result = (at_least_result.rows < dt_file_rows * (1 - invalid_data_ratio_threshold)) //
flowbehappy marked this conversation as resolved.
Show resolved Hide resolved
|| (at_least_result.bytes < dt_file_bytes * (1 - invalid_data_ratio_threshold));
LOG_FMT_TRACE(
log,
"GC - Checking shouldCompactStableWithTooMuchDataOutOfSegmentRange, "
"check_result={} valid_rows={} valid_bytes={} file_rows={} file_bytes={}",
"segment={} check_result={} first_pack_inc={} last_pack_inc={} rows_at_least={} bytes_at_least={} file_rows={} file_bytes={}",
seg->info(),
check_result,
valid_rows,
valid_bytes,
total_rows,
total_bytes);
magic_enum::enum_name(at_least_result.first_pack_intersection),
magic_enum::enum_name(at_least_result.last_pack_intersection),
at_least_result.rows,
at_least_result.bytes,
dt_file_rows,
dt_file_bytes);
seg->setValidDataRatioChecked();
return check_result;
}
Expand All @@ -482,6 +498,10 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte

SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, const SegmentPtr & segment)
{
fiu_do_on(FailPoints::gc_skip_merge, {
return {};
});

auto segment_rows = segment->getEstimatedRows();
auto segment_bytes = segment->getEstimatedBytes();
if (segment_rows >= dm_context->small_segment_rows || segment_bytes >= dm_context->small_segment_bytes)
Expand Down Expand Up @@ -521,6 +541,10 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c

SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_context, const SegmentPtr & segment, const SegmentPtr & prev_segment, const SegmentPtr & next_segment, DB::Timestamp gc_safe_point)
{
fiu_do_on(FailPoints::gc_skip_merge_delta, {
return {};
});

SegmentSnapshotPtr segment_snap;
{
std::shared_lock lock(read_write_mutex);
Expand Down
15 changes: 12 additions & 3 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ std::optional<Segment::SplitInfo> Segment::prepareSplit(DMContext & dm_context,
{
// When split point is not specified, there are some preconditions in order to use logical split.
if (!dm_context.enable_logical_split //
|| segment_snap->stable->getPacks() <= 3 //
|| segment_snap->stable->getDMFilesPacks() <= 3 //
|| segment_snap->delta->getRows() > segment_snap->stable->getRows())
{
try_split_mode = SplitMode::Physical;
Expand Down Expand Up @@ -1565,18 +1565,27 @@ String Segment::simpleInfo() const

String Segment::info() const
{
return fmt::format("<segment_id={} epoch={} range={}{} next_segment_id={} delta_rows={} delta_bytes={} delta_deletes={} stable_file={} stable_rows={} stable_bytes={}>",
return fmt::format("<segment_id={} epoch={} range={}{} next_segment_id={} "
"delta_rows={} delta_bytes={} delta_deletes={} "
"stable_file={} stable_rows={} stable_bytes={} "
"dmf_rows={} dmf_bytes={} dmf_packs={}>",
segment_id,
epoch,
rowkey_range.toDebugString(),
hasAbandoned() ? " abandoned=true" : "",
next_segment_id,

delta->getRows(),
delta->getBytes(),
delta->getDeletes(),

stable->getDMFilesString(),
stable->getRows(),
stable->getBytes());
stable->getBytes(),

stable->getDMFilesRows(),
stable->getDMFilesBytes(),
stable->getDMFilesPacks());
}

String Segment::simpleInfo(const std::vector<SegmentPtr> & segments)
Expand Down
62 changes: 47 additions & 15 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,24 +128,38 @@ size_t StableValueSpace::getBytes() const
return valid_bytes;
}

size_t StableValueSpace::getBytesOnDisk() const
size_t StableValueSpace::getDMFilesBytesOnDisk() const
{
// If this stable value space is logical splitted, some file may not used,
// and this will return more bytes than actual used.
size_t bytes = 0;
for (const auto & file : files)
bytes += file->getBytesOnDisk();
return bytes;
}

size_t StableValueSpace::getPacks() const
size_t StableValueSpace::getDMFilesPacks() const
{
size_t packs = 0;
for (const auto & file : files)
packs += file->getPacks();
return packs;
}

size_t StableValueSpace::getDMFilesRows() const
{
size_t rows = 0;
for (const auto & file : files)
rows += file->getRows();
return rows;
}

size_t StableValueSpace::getDMFilesBytes() const
{
size_t bytes = 0;
for (const auto & file : files)
bytes += file->getBytes();
return bytes;
}

String StableValueSpace::getDMFilesString()
{
String s;
Expand Down Expand Up @@ -397,16 +411,17 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext &
return {approx_rows, approx_bytes};
}

std::pair<bool, bool> StableValueSpace::Snapshot::isFirstAndLastPackIncludedInRange(const DMContext & context, const RowKeyRange & range) const
StableValueSpace::Snapshot::AtLeastRowsAndBytesResult //
StableValueSpace::Snapshot::getAtLeastRowsAndBytes(const DMContext & context, const RowKeyRange & range) const
{
AtLeastRowsAndBytesResult ret{};

// Usually, this method will be called for some "cold" key ranges.
// Loading the index into cache may pollute the cache and make the hot index cache invalid.
// So don't refill the cache if the index does not exist.
bool first_pack_included = false;
bool last_pack_included = false;
for (size_t i = 0; i < stable->files.size(); i++)
for (size_t file_idx = 0; file_idx < stable->files.size(); ++file_idx)
{
const auto & file = stable->files[i];
const auto & file = stable->files[file_idx];
auto filter = DMFilePackFilter::loadFrom(
file,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
Expand All @@ -417,20 +432,37 @@ std::pair<bool, bool> StableValueSpace::Snapshot::isFirstAndLastPackIncludedInRa
context.db_context.getFileProvider(),
context.getReadLimiter(),
context.tracing_id);
const auto & use_packs = filter.getUsePacks();
if (i == 0)
const auto & handle_filter_result = filter.getHandleRes();
if (file_idx == 0)
{
// TODO: this check may not be correct when support multiple files in a stable, let's just keep it now for simplicity
first_pack_included = use_packs.empty() || use_packs[0];
if (handle_filter_result.empty())
ret.first_pack_intersection = RSResult::None;
else
ret.first_pack_intersection = handle_filter_result.front();
}
if (i == stable->files.size() - 1)
if (file_idx == stable->files.size() - 1)
{
// TODO: this check may not be correct when support multiple files in a stable, let's just keep it now for simplicity
last_pack_included = use_packs.empty() || use_packs.back();
if (handle_filter_result.empty())
ret.last_pack_intersection = RSResult::None;
else
ret.last_pack_intersection = handle_filter_result.back();
}

const auto & pack_stats = file->getPackStats();
for (size_t pack_idx = 0; pack_idx < pack_stats.size(); ++pack_idx)
{
// Only count packs that are fully contained by the range.
if (handle_filter_result[pack_idx] == RSResult::All)
{
ret.rows += pack_stats[pack_idx].rows;
ret.bytes += pack_stats[pack_idx].bytes;
}
}
}

return std::make_pair(first_pack_included, last_pack_included);
return ret;
}

} // namespace DM
Expand Down
Loading