Skip to content

Commit

Permalink
clean up segments with large space amplification ratio generated by l…
Browse files Browse the repository at this point in the history
…ogical split (#5860)

close #5817
  • Loading branch information
lidezhu authored Sep 14, 2022
1 parent 3e525b5 commit 1fdea3e
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 18 deletions.
165 changes: 148 additions & 17 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ extern const Metric DT_SnapshotOfDeltaMerge;

namespace DB
{

namespace FailPoints
{
extern const char pause_before_dt_background_delta_merge[];
Expand All @@ -34,7 +33,6 @@ extern const char pause_until_dt_background_delta_merge[];

namespace DM
{

void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
{
// Callbacks for cleaning outdated DTFiles. Note that there is a chance
Expand Down Expand Up @@ -249,9 +247,32 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy)

namespace GC
{
enum Type
{
Unknown,
TooManyDeleteRange,
TooMuchOutOfRange,
TooManyInvalidVersion,
};

static std::string toString(Type type)
{
switch (type)
{
case TooManyDeleteRange:
return "TooManyDeleteRange";
case TooMuchOutOfRange:
return "TooMuchOutOfRange";
case TooManyInvalidVersion:
return "TooManyInvalidVersion";
default:
return "Unknown";
}
}

// Returns true if it needs gc.
// This is for optimization purpose, does not mean to be accurate.
bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, const LoggerPtr & log)
bool shouldCompactStableWithTooManyInvalidVersion(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, const LoggerPtr & log)
{
// Always GC.
if (ratio_threshold < 1.0)
Expand All @@ -271,7 +292,7 @@ bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, dou
return false;
}

bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double ratio_threshold, const LoggerPtr & log)
bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double invalid_data_ratio_threshold, const LoggerPtr & log)
{
auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range);
if (actual_delete_range.none())
Expand All @@ -291,8 +312,81 @@ bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapsh
// because before write apply snapshot file, it will write a delete range first, and will meet the following gc criteria.
// But the cost should be really minor because merge delta on an empty segment should be very fast.
// What's more, we can ignore this kind of delete range in future to avoid this extra gc.
bool should_compact = (delete_rows >= stable_rows * ratio_threshold) || (delete_bytes >= stable_bytes * ratio_threshold);
return should_compact;
return (delete_rows >= stable_rows * invalid_data_ratio_threshold) || (delete_bytes >= stable_bytes * invalid_data_ratio_threshold);
}

std::unordered_set<UInt64> getDMFileIDs(const SegmentPtr & seg)
{
std::unordered_set<UInt64> file_ids;
// We get the file ids in the segment no matter it is abandoned or not,
// because even it is abandoned, we don't know whether the new segment will ref the old dtfiles.
// So we just be conservative here to return the old file ids.
// This is ok because the next check will get the right file ids in such case.
if (seg)
{
const auto & dm_files = seg->getStable()->getDMFiles();
for (const auto & file : dm_files)
{
file_ids.emplace(file->fileId());
}
}
return file_ids;
}

bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & context, //
const SegmentPtr & seg,
const SegmentSnapshotPtr & snap,
const SegmentPtr & prev_seg,
const SegmentPtr & next_seg,
double invalid_data_ratio_threshold,
const LoggerPtr & log)
{
std::unordered_set<UInt64> prev_segment_file_ids = getDMFileIDs(prev_seg);
std::unordered_set<UInt64> next_segment_file_ids = getDMFileIDs(next_seg);
auto [first_pack_included, last_pack_included] = snap->stable->isFirstAndLastPackIncludedInRange(context, seg->getRowKeyRange());
// Do a quick check about whether the DTFile is completely included in the segment range
if (first_pack_included && last_pack_included)
{
seg->setValidDataRatioChecked();
return false;
}
bool contains_invalid_data = false;
const auto & dt_files = snap->stable->getDMFiles();
if (!first_pack_included)
{
auto first_file_id = dt_files[0]->fileId();
if (prev_segment_file_ids.count(first_file_id) == 0)
{
contains_invalid_data = true;
}
}
if (!last_pack_included)
{
auto last_file_id = dt_files[dt_files.size() - 1]->fileId();
if (next_segment_file_ids.count(last_file_id) == 0)
{
contains_invalid_data = true;
}
}
// Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments.
if (!contains_invalid_data)
{
return false;
}

size_t total_rows = 0;
size_t total_bytes = 0;
for (const auto & file : dt_files)
{
total_rows += file->getRows();
total_bytes += file->getBytes();
}
auto valid_rows = snap->stable->getRows();
auto valid_bytes = snap->stable->getBytes();

LOG_FMT_TRACE(log, "valid_rows [{}], valid_bytes [{}] total_rows [{}] total_bytes [{}]", valid_rows, valid_bytes, total_rows, total_bytes);
seg->setValidDataRatioChecked();
return (valid_rows < total_rows * (1 - invalid_data_ratio_threshold)) || (valid_bytes < total_bytes * (1 - invalid_data_ratio_threshold));
}
} // namespace GC

Expand Down Expand Up @@ -333,6 +427,8 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)

auto dm_context = newDMContext(global_context, global_context.getSettingsRef(), "onSyncGc");
SegmentPtr segment;
SegmentPtr prev_segment = nullptr;
SegmentPtr next_segment = nullptr;
SegmentSnapshotPtr segment_snap;
{
std::shared_lock lock(read_write_mutex);
Expand All @@ -349,6 +445,16 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
segment = segment_it->second;
next_gc_check_key = segment_it->first.toRowKeyValue();
segment_snap = segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge);
auto next_segment_it = next(segment_it, 1);
if (next_segment_it != segments.end())
{
next_segment = next_segment_it->second;
}
if (segment_it != segments.begin())
{
auto prev_segment_it = prev(segment_it, 1);
prev_segment = prev_segment_it->second;
}
}

assert(segment != nullptr);
Expand All @@ -370,17 +476,36 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
try
{
// Check whether we should apply gc on this segment
auto invalid_data_ratio_threshold = global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc;
RUNTIME_ASSERT(invalid_data_ratio_threshold >= 0 && invalid_data_ratio_threshold <= 1);
bool should_compact = false;
GC::Type gc_type = GC::Type::Unknown;
if (GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
invalid_data_ratio_threshold,
log))
{
should_compact = true;
gc_type = GC::Type::TooManyDeleteRange;
}
else if (segment->getLastCheckGCSafePoint() < gc_safe_point)
else if (!segment->isValidDataRatioChecked())
{
if (GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(
*dm_context,
segment,
segment_snap,
prev_segment,
next_segment,
invalid_data_ratio_threshold,
log))
{
should_compact = true;
gc_type = GC::Type::TooMuchOutOfRange;
}
}
else if (!should_compact && (segment->getLastCheckGCSafePoint() < gc_safe_point))
{
// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
Expand All @@ -394,11 +519,15 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

should_compact = GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log);
if (GC::shouldCompactStableWithTooManyInvalidVersion(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log))
{
should_compact = true;
gc_type = GC::Type::TooManyInvalidVersion;
}
}
bool finish_gc_on_segment = false;
if (should_compact)
Expand All @@ -412,17 +541,19 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
finish_gc_on_segment = true;
LOG_FMT_DEBUG(
log,
"Finish GC-merge-delta, segment={} table={}",
"Finish GC-merge-delta, segment={} table={}, gc_type={}",
segment->simpleInfo(),
table_name);
table_name,
GC::toString(gc_type));
}
else
{
LOG_FMT_DEBUG(
log,
"GC aborted, segment={} table={}",
"GC aborted, segment={} table={}, gc_type={}",
segment->simpleInfo(),
table_name);
table_name,
GC::toString(gc_type));
}
}
if (!finish_gc_on_segment)
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ class Segment : private boost::noncopyable
bool isSplitForbidden() const { return split_forbidden; }
void forbidSplit() { split_forbidden = true; }

bool isValidDataRatioChecked() const { return check_valid_data_ratio.load(std::memory_order_relaxed); }
void setValidDataRatioChecked() { check_valid_data_ratio.store(true, std::memory_order_relaxed); }

void drop(const FileProviderPtr & file_provider, WriteBatches & wbs);

bool isFlushing() const { return delta->isFlushing(); }
Expand Down Expand Up @@ -424,6 +427,11 @@ class Segment : private boost::noncopyable
const StableValueSpacePtr stable;

bool split_forbidden = false;
// After logical split, it is very possible that only half of the data in the segment's DTFile is valid for this segment.
// So we want to do merge delta on this kind of segment to clean out the invalid data.
// This involves to check the valid data ratio in the background gc thread,
// and to avoid doing this check repeatedly, we add this flag to indicate whether the valid data ratio has already been checked.
std::atomic<bool> check_valid_data_ratio = false;

LoggerPtr log;
};
Expand Down
34 changes: 34 additions & 0 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,5 +397,39 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext &
return {approx_rows, approx_bytes};
}

std::pair<bool, bool> StableValueSpace::Snapshot::isFirstAndLastPackIncludedInRange(const DMContext & context, const RowKeyRange & range) const
{
// Usually, this method will be called for some "cold" key ranges.
// Loading the index into cache may pollute the cache and make the hot index cache invalid.
// So don't refill the cache if the index does not exist.
bool first_pack_included = false;
bool last_pack_included = false;
for (size_t i = 0; i < stable->files.size(); i++)
{
const auto & file = stable->files[i];
auto filter = DMFilePackFilter::loadFrom(
file,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
/*set_cache_if_miss*/ false,
{range},
RSOperatorPtr{},
IdSetPtr{},
context.db_context.getFileProvider(),
context.getReadLimiter(),
context.tracing_id);
const auto & use_packs = filter.getUsePacks();
if (i == 0)
{
first_pack_included = use_packs[0];
}
if (i == stable->files.size() - 1)
{
last_pack_included = use_packs[use_packs.size() - 1];
}
}

return std::make_pair(first_pack_included, last_pack_included);
}

} // namespace DM
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>

RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const;

std::pair<bool, bool> isFirstAndLastPackIncludedInRange(const DMContext & context, const RowKeyRange & range) const;

private:
Poco::Logger * log;
};
Expand Down
Loading

0 comments on commit 1fdea3e

Please sign in to comment.