Skip to content

Commit

Permalink
storage: GC segments contained by the pack (#6010)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
Co-authored-by: Flowyi <[email protected]>
  • Loading branch information
3 people authored Sep 22, 2022
1 parent f22873c commit 43571c0
Show file tree
Hide file tree
Showing 11 changed files with 647 additions and 155 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
M(gc_skip_update_safe_point) \
M(gc_skip_merge_delta) \
M(gc_skip_merge) \
M(force_set_page_file_write_errno) \
M(force_split_io_size_4k) \
M(minimum_block_size_for_cross_join) \
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1546,17 +1546,17 @@ DeltaMergeStoreStat DeltaMergeStore::getStat()
total_delta_valid_cache_rows += delta->getValidCacheRows();
}

if (stable->getPacks())
if (stable->getDMFilesPacks())
{
stat.total_rows += stable->getRows();
stat.total_size += stable->getBytes();

stat.stable_count += 1;
stat.total_pack_count_in_stable += stable->getPacks();
stat.total_pack_count_in_stable += stable->getDMFilesPacks();

stat.total_stable_rows += stable->getRows();
stat.total_stable_size += stable->getBytes();
stat.total_stable_size_on_disk += stable->getBytesOnDisk();
stat.total_stable_size_on_disk += stable->getDMFilesBytesOnDisk();
}
}

Expand Down Expand Up @@ -1679,10 +1679,10 @@ SegmentStats DeltaMergeStore::getSegmentStats()
stat.size = delta->getBytes() + stable->getBytes();
stat.delete_ranges = delta->getDeletes();

stat.stable_size_on_disk = stable->getBytesOnDisk();
stat.stable_size_on_disk = stable->getDMFilesBytesOnDisk();

stat.delta_pack_count = delta->getColumnFileCount();
stat.stable_pack_count = stable->getPacks();
stat.stable_pack_count = stable->getDMFilesPacks();

stat.avg_delta_pack_rows = static_cast<Float64>(delta->getRows()) / stat.delta_pack_count;
stat.avg_stable_pack_rows = static_cast<Float64>(stable->getRows()) / stat.stable_pack_count;
Expand Down
111 changes: 77 additions & 34 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ namespace DB
namespace FailPoints
{
extern const char gc_skip_update_safe_point[];
extern const char gc_skip_merge_delta[];
extern const char gc_skip_merge[];
extern const char pause_before_dt_background_delta_merge[];
extern const char pause_until_dt_background_delta_merge[];
} // namespace FailPoints
Expand Down Expand Up @@ -396,12 +398,22 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte
double invalid_data_ratio_threshold,
const LoggerPtr & log)
{
auto [first_pack_included, last_pack_included] = snap->stable->isFirstAndLastPackIncludedInRange(context, seg->getRowKeyRange());
// Do a quick check about whether the DTFile is completely included in the segment range
if (first_pack_included && last_pack_included)
if (snap->stable->getDMFilesPacks() == 0)
{
LOG_FMT_TRACE(log, "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange marking "
"segment as valid data ratio checked because all packs are included, segment={}",
LOG_FMT_TRACE(
log,
"GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange skipped segment "
"because the DTFile of stable is empty, segment={}",
seg->info());
return false;
}

auto at_least_result = snap->stable->getAtLeastRowsAndBytes(context, seg->getRowKeyRange());
if (at_least_result.first_pack_intersection == RSResult::All //
&& at_least_result.last_pack_intersection == RSResult::All)
{
LOG_FMT_TRACE(log, "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange permanently skipped segment "
"because all packs in DTFiles are fully contained by the segment range, segment={}",
seg->info());
seg->setValidDataRatioChecked();
return false;
Expand All @@ -410,34 +422,34 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte
std::unordered_set<UInt64> prev_segment_file_ids = getDMFileIDs(prev_seg);
std::unordered_set<UInt64> next_segment_file_ids = getDMFileIDs(next_seg);

// Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments.
bool contains_invalid_data = false;
const auto & dt_files = snap->stable->getDMFiles();
if (!first_pack_included)
if (at_least_result.first_pack_intersection != RSResult::All)
{
auto first_file_id = dt_files[0]->fileId();
if (prev_segment_file_ids.count(first_file_id) == 0)
auto first_file_id = dt_files.front()->fileId();
if (prev_seg != nullptr && prev_segment_file_ids.count(first_file_id) == 0)
{
contains_invalid_data = true;
}
}
if (!last_pack_included)
if (at_least_result.last_pack_intersection != RSResult::All)
{
auto last_file_id = dt_files[dt_files.size() - 1]->fileId();
if (next_segment_file_ids.count(last_file_id) == 0)
auto last_file_id = dt_files.back()->fileId();
if (next_seg != nullptr && next_segment_file_ids.count(last_file_id) == 0)
{
contains_invalid_data = true;
}
}
// Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments.
if (!contains_invalid_data)
{
LOG_FMT_TRACE(
log,
"GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange checked false because no invalid data, "
"segment={} first_pack_included={} last_pack_included={} prev_seg_files=[{}] next_seg_files=[{}] my_files=[{}]",
seg->simpleInfo(),
first_pack_included,
last_pack_included,
"GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange checked false "
"because segment DTFile is shared with a neighbor segment, "
"first_pack_inc={} last_pack_inc={} prev_seg_files=[{}] next_seg_files=[{}] my_files=[{}] segment={}",
magic_enum::enum_name(at_least_result.first_pack_intersection),
magic_enum::enum_name(at_least_result.last_pack_intersection),
fmt::join(prev_segment_file_ids, ","),
fmt::join(next_segment_file_ids, ","),
[&] {
Expand All @@ -450,30 +462,53 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte
},
",");
return fmt_buf.toString();
}());
}(),
seg->info());
// We do not mark `setValidDataRatioChecked` because neighbor segments' state could change.
return false;
}

size_t total_rows = 0;
size_t total_bytes = 0;
for (const auto & file : dt_files)
{
total_rows += file->getRows();
total_bytes += file->getBytes();
}
auto valid_rows = snap->stable->getRows();
auto valid_bytes = snap->stable->getBytes();

auto check_result = (valid_rows < total_rows * (1 - invalid_data_ratio_threshold)) || (valid_bytes < total_bytes * (1 - invalid_data_ratio_threshold));
size_t file_rows = snap->stable->getDMFilesRows();
size_t file_bytes = snap->stable->getDMFilesBytes();

// We use at_least_rows|bytes, instead of stable_rows|bytes. The difference is that, at_least_rows|bytes only count packs
// that are fully contained in the segment range, while stable_rows|bytes count packs that are intersected with the segment
// range.
//
// Consider the following case, where segment only contain one pack:
// │***** ******│ DTFile only contains 1 pack
// │<------>│ Segment
// This kind of data layout may be produced by logical split. In this case, ratio calculated using at_least_rows would be 0%,
// but ratio calculated using stable_rows would be 100%.
// We definitely want such DTFile to be reclaimed, because this segment is not containing any real rows at all!.
//
// Of course there are false positives, consider the following case:
// │*************************│ DTFile only contains 1 pack
// │<------------------->│ Segment
// The segment is containing most of the data in the DTFile and not much space can be reclaimed after merging the delta.
// We are just wasting the disk IO when doing the GC.
// This is currently acceptable, considering that:
// 1) The cost of rewriting the stable of 1 pack is small
// 2) After rewriting, the segment will not need to be rewritten again, as it will look like:
// │*********************│ DTFile only contains 1 pack
// │<------------------->│ Segment
//
// See https://github.com/pingcap/tiflash/pull/6010 for more details.

auto check_result = (at_least_result.rows < file_rows * (1 - invalid_data_ratio_threshold)) //
|| (at_least_result.bytes < file_bytes * (1 - invalid_data_ratio_threshold));
LOG_FMT_TRACE(
log,
"GC - Checking shouldCompactStableWithTooMuchDataOutOfSegmentRange, "
"check_result={} valid_rows={} valid_bytes={} file_rows={} file_bytes={}",
"check_result={} first_pack_inc={} last_pack_inc={} rows_at_least={} bytes_at_least={} file_rows={} file_bytes={} segment={} ",
check_result,
valid_rows,
valid_bytes,
total_rows,
total_bytes);
magic_enum::enum_name(at_least_result.first_pack_intersection),
magic_enum::enum_name(at_least_result.last_pack_intersection),
at_least_result.rows,
at_least_result.bytes,
file_rows,
file_bytes,
seg->info());
seg->setValidDataRatioChecked();
return check_result;
}
Expand All @@ -482,6 +517,10 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte

SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, const SegmentPtr & segment)
{
fiu_do_on(FailPoints::gc_skip_merge, {
return {};
});

auto segment_rows = segment->getEstimatedRows();
auto segment_bytes = segment->getEstimatedBytes();
if (segment_rows >= dm_context->small_segment_rows || segment_bytes >= dm_context->small_segment_bytes)
Expand Down Expand Up @@ -521,6 +560,10 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c

SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_context, const SegmentPtr & segment, const SegmentPtr & prev_segment, const SegmentPtr & next_segment, DB::Timestamp gc_safe_point)
{
fiu_do_on(FailPoints::gc_skip_merge_delta, {
return {};
});

SegmentSnapshotPtr segment_snap;
{
std::shared_lock lock(read_write_mutex);
Expand Down
15 changes: 12 additions & 3 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ std::optional<Segment::SplitInfo> Segment::prepareSplit(DMContext & dm_context,
{
// When split point is not specified, there are some preconditions in order to use logical split.
if (!dm_context.enable_logical_split //
|| segment_snap->stable->getPacks() <= 3 //
|| segment_snap->stable->getDMFilesPacks() <= 3 //
|| segment_snap->delta->getRows() > segment_snap->stable->getRows())
{
try_split_mode = SplitMode::Physical;
Expand Down Expand Up @@ -1565,18 +1565,27 @@ String Segment::simpleInfo() const

String Segment::info() const
{
return fmt::format("<segment_id={} epoch={} range={}{} next_segment_id={} delta_rows={} delta_bytes={} delta_deletes={} stable_file={} stable_rows={} stable_bytes={}>",
return fmt::format("<segment_id={} epoch={} range={}{} next_segment_id={} "
"delta_rows={} delta_bytes={} delta_deletes={} "
"stable_file={} stable_rows={} stable_bytes={} "
"dmf_rows={} dmf_bytes={} dmf_packs={}>",
segment_id,
epoch,
rowkey_range.toDebugString(),
hasAbandoned() ? " abandoned=true" : "",
next_segment_id,

delta->getRows(),
delta->getBytes(),
delta->getDeletes(),

stable->getDMFilesString(),
stable->getRows(),
stable->getBytes());
stable->getBytes(),

stable->getDMFilesRows(),
stable->getDMFilesBytes(),
stable->getDMFilesPacks());
}

String Segment::simpleInfo(const std::vector<SegmentPtr> & segments)
Expand Down
62 changes: 47 additions & 15 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,24 +128,38 @@ size_t StableValueSpace::getBytes() const
return valid_bytes;
}

size_t StableValueSpace::getBytesOnDisk() const
size_t StableValueSpace::getDMFilesBytesOnDisk() const
{
// If this stable value space is logical splitted, some file may not used,
// and this will return more bytes than actual used.
size_t bytes = 0;
for (const auto & file : files)
bytes += file->getBytesOnDisk();
return bytes;
}

size_t StableValueSpace::getPacks() const
size_t StableValueSpace::getDMFilesPacks() const
{
size_t packs = 0;
for (const auto & file : files)
packs += file->getPacks();
return packs;
}

size_t StableValueSpace::getDMFilesRows() const
{
size_t rows = 0;
for (const auto & file : files)
rows += file->getRows();
return rows;
}

size_t StableValueSpace::getDMFilesBytes() const
{
size_t bytes = 0;
for (const auto & file : files)
bytes += file->getBytes();
return bytes;
}

String StableValueSpace::getDMFilesString()
{
String s;
Expand Down Expand Up @@ -397,16 +411,17 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext &
return {approx_rows, approx_bytes};
}

std::pair<bool, bool> StableValueSpace::Snapshot::isFirstAndLastPackIncludedInRange(const DMContext & context, const RowKeyRange & range) const
StableValueSpace::Snapshot::AtLeastRowsAndBytesResult //
StableValueSpace::Snapshot::getAtLeastRowsAndBytes(const DMContext & context, const RowKeyRange & range) const
{
AtLeastRowsAndBytesResult ret{};

// Usually, this method will be called for some "cold" key ranges.
// Loading the index into cache may pollute the cache and make the hot index cache invalid.
// So don't refill the cache if the index does not exist.
bool first_pack_included = false;
bool last_pack_included = false;
for (size_t i = 0; i < stable->files.size(); i++)
for (size_t file_idx = 0; file_idx < stable->files.size(); ++file_idx)
{
const auto & file = stable->files[i];
const auto & file = stable->files[file_idx];
auto filter = DMFilePackFilter::loadFrom(
file,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
Expand All @@ -417,20 +432,37 @@ std::pair<bool, bool> StableValueSpace::Snapshot::isFirstAndLastPackIncludedInRa
context.db_context.getFileProvider(),
context.getReadLimiter(),
context.tracing_id);
const auto & use_packs = filter.getUsePacks();
if (i == 0)
const auto & handle_filter_result = filter.getHandleRes();
if (file_idx == 0)
{
// TODO: this check may not be correct when support multiple files in a stable, let's just keep it now for simplicity
first_pack_included = use_packs.empty() || use_packs[0];
if (handle_filter_result.empty())
ret.first_pack_intersection = RSResult::None;
else
ret.first_pack_intersection = handle_filter_result.front();
}
if (i == stable->files.size() - 1)
if (file_idx == stable->files.size() - 1)
{
// TODO: this check may not be correct when support multiple files in a stable, let's just keep it now for simplicity
last_pack_included = use_packs.empty() || use_packs.back();
if (handle_filter_result.empty())
ret.last_pack_intersection = RSResult::None;
else
ret.last_pack_intersection = handle_filter_result.back();
}

const auto & pack_stats = file->getPackStats();
for (size_t pack_idx = 0; pack_idx < pack_stats.size(); ++pack_idx)
{
// Only count packs that are fully contained by the range.
if (handle_filter_result[pack_idx] == RSResult::All)
{
ret.rows += pack_stats[pack_idx].rows;
ret.bytes += pack_stats[pack_idx].bytes;
}
}
}

return std::make_pair(first_pack_included, last_pack_included);
return ret;
}

} // namespace DM
Expand Down
Loading

0 comments on commit 43571c0

Please sign in to comment.