Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean up segments with large space amplification ratio generated by logical split #5860

Merged
merged 8 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 148 additions & 17 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ extern const Metric DT_SnapshotOfDeltaMerge;

namespace DB
{

namespace FailPoints
{
extern const char pause_before_dt_background_delta_merge[];
Expand All @@ -34,7 +33,6 @@ extern const char pause_until_dt_background_delta_merge[];

namespace DM
{

void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
{
// Callbacks for cleaning outdated DTFiles. Note that there is a chance
Expand Down Expand Up @@ -249,9 +247,32 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy)

namespace GC
{
enum Type
{
Unknown,
TooManyDeleteRange,
TooMuchOutOfRange,
TooManyInvalidVersion,
};

static std::string toString(Type type)
{
switch (type)
{
case TooManyDeleteRange:
return "TooManyDeleteRange";
case TooMuchOutOfRange:
return "TooMuchOutOfRange";
case TooManyInvalidVersion:
return "TooManyInvalidVersion";
default:
return "Unknown";
}
}

// Returns true if it needs gc.
// This is for optimization purpose, does not mean to be accurate.
bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, const LoggerPtr & log)
bool shouldCompactStableWithTooManyInvalidVersion(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, const LoggerPtr & log)
{
// Always GC.
if (ratio_threshold < 1.0)
Expand All @@ -271,7 +292,7 @@ bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, dou
return false;
}

bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double ratio_threshold, const LoggerPtr & log)
bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double invalid_data_ratio_threshold, const LoggerPtr & log)
{
auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range);
if (actual_delete_range.none())
Expand All @@ -291,8 +312,81 @@ bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapsh
// because before write apply snapshot file, it will write a delete range first, and will meet the following gc criteria.
// But the cost should be really minor because merge delta on an empty segment should be very fast.
// What's more, we can ignore this kind of delete range in future to avoid this extra gc.
bool should_compact = (delete_rows >= stable_rows * ratio_threshold) || (delete_bytes >= stable_bytes * ratio_threshold);
return should_compact;
return (delete_rows >= stable_rows * invalid_data_ratio_threshold) || (delete_bytes >= stable_bytes * invalid_data_ratio_threshold);
}

std::unordered_set<UInt64> getDMFileIDs(const SegmentPtr & seg)
{
std::unordered_set<UInt64> file_ids;
// We get the file ids in the segment no matter it is abandoned or not,
// because even it is abandoned, we don't know whether the new segment will ref the old dtfiles.
// So we just be conservative here to return the old file ids.
// This is ok because the next check will get the right file ids in such case.
if (seg)
{
const auto & dm_files = seg->getStable()->getDMFiles();
for (const auto & file : dm_files)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I'm curious, when will we have multi DTfiles in the stable currently?

Copy link
Contributor Author

@lidezhu lidezhu Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is just one DTFile now. But we keep the interface like this to avoid the assumption that there is only one DTFile. So we can support multiple DTFiles in a stable easier in the future if needed.

{
file_ids.emplace(file->fileId());
}
}
return file_ids;
}

bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & context, //
const SegmentPtr & seg,
const SegmentSnapshotPtr & snap,
const SegmentPtr & prev_seg,
const SegmentPtr & next_seg,
double invalid_data_ratio_threshold,
const LoggerPtr & log)
{
std::unordered_set<UInt64> prev_segment_file_ids = getDMFileIDs(prev_seg);
std::unordered_set<UInt64> next_segment_file_ids = getDMFileIDs(next_seg);
auto [first_pack_included, last_pack_included] = snap->stable->isFirstAndLastPackIncludedInRange(context, seg->getRowKeyRange());
// Do a quick check about whether the DTFile is completely included in the segment range
if (first_pack_included && last_pack_included)
{
seg->setValidDataRatioChecked();
return false;
}
bool contains_invalid_data = false;
const auto & dt_files = snap->stable->getDMFiles();
if (!first_pack_included)
{
auto first_file_id = dt_files[0]->fileId();
if (prev_segment_file_ids.count(first_file_id) == 0)
{
contains_invalid_data = true;
}
}
if (!last_pack_included)
{
auto last_file_id = dt_files[dt_files.size() - 1]->fileId();
if (next_segment_file_ids.count(last_file_id) == 0)
{
contains_invalid_data = true;
}
}
// Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments.
if (!contains_invalid_data)
{
return false;
}

size_t total_rows = 0;
size_t total_bytes = 0;
for (const auto & file : dt_files)
{
total_rows += file->getRows();
total_bytes += file->getBytes();
}
auto valid_rows = snap->stable->getRows();
auto valid_bytes = snap->stable->getBytes();

LOG_FMT_TRACE(log, "valid_rows [{}], valid_bytes [{}] total_rows [{}] total_bytes [{}]", valid_rows, valid_bytes, total_rows, total_bytes);
seg->setValidDataRatioChecked();
return (valid_rows < total_rows * (1 - invalid_data_ratio_threshold)) || (valid_bytes < total_bytes * (1 - invalid_data_ratio_threshold));
Copy link
Member

@breezewish breezewish Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean, if we have logical split enabled, then logical split will be nearly useless (and becomes very similar to physical split), because all segments split-out by logical split usually only contains 50% data (which < the 70% threshold)?

Consider the workload:

  1. Write a lot of data to one segment
  2. Segment is logical split into two
  3. (No further writes to these two segment)

In this PR, the two segment each contains 50% data, so that they both trigger GC.

However, actually in this workload GC will not reclaim further spaces, because all data in the underlying DTFile is "useful".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the heavy work is moved to background gc thread. And I thought this is exactly the solution that we talked about previously(

Copy link
Member

@breezewish breezewish Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but may be a better way would be counting by DTFiles, for example, check whether a DTFile is < 70% utilized. When a DTFile is < 70% utilized, it means delta-merging all segments who use this DTFile can result in reclaiming 30% space. Otherwise delta-merging these segments will not have notable benefits.

However, the newly proposed check could be more complex (it definitely needs iterating all segments for multiple rounds) and I'm not sure whether covering this case is useful. @JaySon-Huang @flowbehappy What do you think?

Copy link
Contributor Author

@lidezhu lidezhu Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This is a better and more complex solution.

Copy link
Contributor Author

@lidezhu lidezhu Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a check for whether this segment share any DTFile with neighbor segments before check the invalid data ratio.
So for segments generated by logical split, it will only do clean work when there is no neighbor sharing the same DTFile.

For DTFile which is shared by two segments, this is exactly the correct behavior we want.

For DTFile which is shared by more than two segments,

  1. this is really a rare case, because it is hard to do yet another logical split on a segment which is generated by logical split.(Check Segment::getSplitPointFast);
  2. even it happens, if one of the multiple segments is updated and on longer ref the shared DTFile, it is beneficial to do the clean work on the remaining segments;

So I think the current solution is good enough for our purpose.

}
} // namespace GC

Expand Down Expand Up @@ -333,6 +427,8 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)

auto dm_context = newDMContext(global_context, global_context.getSettingsRef(), "onSyncGc");
SegmentPtr segment;
SegmentPtr prev_segment = nullptr;
SegmentPtr next_segment = nullptr;
SegmentSnapshotPtr segment_snap;
{
std::shared_lock lock(read_write_mutex);
Expand All @@ -349,6 +445,16 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
segment = segment_it->second;
next_gc_check_key = segment_it->first.toRowKeyValue();
segment_snap = segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge);
auto next_segment_it = next(segment_it, 1);
if (next_segment_it != segments.end())
{
next_segment = next_segment_it->second;
}
if (segment_it != segments.begin())
{
auto prev_segment_it = prev(segment_it, 1);
prev_segment = prev_segment_it->second;
}
}

assert(segment != nullptr);
Expand All @@ -370,17 +476,36 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
try
{
// Check whether we should apply gc on this segment
auto invalid_data_ratio_threshold = global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc;
RUNTIME_ASSERT(invalid_data_ratio_threshold >= 0 && invalid_data_ratio_threshold <= 1);
bool should_compact = false;
GC::Type gc_type = GC::Type::Unknown;
if (GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
invalid_data_ratio_threshold,
log))
{
should_compact = true;
gc_type = GC::Type::TooManyDeleteRange;
}
else if (segment->getLastCheckGCSafePoint() < gc_safe_point)
else if (!segment->isValidDataRatioChecked())
{
if (GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(
*dm_context,
segment,
segment_snap,
prev_segment,
next_segment,
invalid_data_ratio_threshold,
log))
{
should_compact = true;
gc_type = GC::Type::TooMuchOutOfRange;
}
}
else if (!should_compact && (segment->getLastCheckGCSafePoint() < gc_safe_point))
{
// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
Expand All @@ -394,11 +519,15 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

should_compact = GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log);
if (GC::shouldCompactStableWithTooManyInvalidVersion(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log))
{
should_compact = true;
gc_type = GC::Type::TooManyInvalidVersion;
}
}
bool finish_gc_on_segment = false;
if (should_compact)
Expand All @@ -412,17 +541,19 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
finish_gc_on_segment = true;
LOG_FMT_DEBUG(
log,
"Finish GC-merge-delta, segment={} table={}",
"Finish GC-merge-delta, segment={} table={}, gc_type={}",
segment->simpleInfo(),
table_name);
table_name,
GC::toString(gc_type));
}
else
{
LOG_FMT_DEBUG(
log,
"GC aborted, segment={} table={}",
"GC aborted, segment={} table={}, gc_type={}",
segment->simpleInfo(),
table_name);
table_name,
GC::toString(gc_type));
}
}
if (!finish_gc_on_segment)
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ class Segment : private boost::noncopyable
bool isSplitForbidden() const { return split_forbidden; }
void forbidSplit() { split_forbidden = true; }

bool isValidDataRatioChecked() const { return check_valid_data_ratio.load(std::memory_order_relaxed); }
void setValidDataRatioChecked() { check_valid_data_ratio.store(true, std::memory_order_relaxed); }

void drop(const FileProviderPtr & file_provider, WriteBatches & wbs);

bool isFlushing() const { return delta->isFlushing(); }
Expand Down Expand Up @@ -424,6 +427,11 @@ class Segment : private boost::noncopyable
const StableValueSpacePtr stable;

bool split_forbidden = false;
// After logical split, it is very possible that only half of the data in the segment's DTFile is valid for this segment.
// So we want to do merge delta on this kind of segment to clean out the invalid data.
// This involves to check the valid data ratio in the background gc thread,
// and to avoid doing this check repeatedly, we add this flag to indicate whether the valid data ratio has already been checked.
std::atomic<bool> check_valid_data_ratio = false;
lidezhu marked this conversation as resolved.
Show resolved Hide resolved

LoggerPtr log;
};
Expand Down
34 changes: 34 additions & 0 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,5 +397,39 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext &
return {approx_rows, approx_bytes};
}

std::pair<bool, bool> StableValueSpace::Snapshot::isFirstAndLastPackIncludedInRange(const DMContext & context, const RowKeyRange & range) const
{
// Usually, this method will be called for some "cold" key ranges.
// Loading the index into cache may pollute the cache and make the hot index cache invalid.
// So don't refill the cache if the index does not exist.
bool first_pack_included = false;
bool last_pack_included = false;
for (size_t i = 0; i < stable->files.size(); i++)
{
const auto & file = stable->files[i];
auto filter = DMFilePackFilter::loadFrom(
file,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
/*set_cache_if_miss*/ false,
{range},
RSOperatorPtr{},
IdSetPtr{},
context.db_context.getFileProvider(),
context.getReadLimiter(),
context.tracing_id);
const auto & use_packs = filter.getUsePacks();
if (i == 0)
{
first_pack_included = use_packs[0];
}
if (i == stable->files.size() - 1)
{
last_pack_included = use_packs[use_packs.size() - 1];
}
}

return std::make_pair(first_pack_included, last_pack_included);
}

} // namespace DM
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>

RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const;

std::pair<bool, bool> isFirstAndLastPackIncludedInRange(const DMContext & context, const RowKeyRange & range) const;

private:
Poco::Logger * log;
};
Expand Down
Loading