diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 0627734ccd2..43c10d3e2eb 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -25,7 +25,6 @@ extern const Metric DT_SnapshotOfDeltaMerge; namespace DB { - namespace FailPoints { extern const char pause_before_dt_background_delta_merge[]; @@ -34,7 +33,6 @@ extern const char pause_until_dt_background_delta_merge[]; namespace DM { - void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) { // Callbacks for cleaning outdated DTFiles. Note that there is a chance @@ -249,9 +247,32 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) namespace GC { +enum Type +{ + Unknown, + TooManyDeleteRange, + TooMuchOutOfRange, + TooManyInvalidVersion, +}; + +static std::string toString(Type type) +{ + switch (type) + { + case TooManyDeleteRange: + return "TooManyDeleteRange"; + case TooMuchOutOfRange: + return "TooMuchOutOfRange"; + case TooManyInvalidVersion: + return "TooManyInvalidVersion"; + default: + return "Unknown"; + } +} + // Returns true if it needs gc. // This is for optimization purpose, does not mean to be accurate. -bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, const LoggerPtr & log) +bool shouldCompactStableWithTooManyInvalidVersion(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, const LoggerPtr & log) { // Always GC. if (ratio_threshold < 1.0) @@ -271,7 +292,7 @@ bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, dou return false; } -bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double ratio_threshold, const LoggerPtr & log) +bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double invalid_data_ratio_threshold, const LoggerPtr & log) { auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range); if (actual_delete_range.none()) @@ -291,8 +312,81 @@ bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapsh // because before write apply snapshot file, it will write a delete range first, and will meet the following gc criteria. // But the cost should be really minor because merge delta on an empty segment should be very fast. // What's more, we can ignore this kind of delete range in future to avoid this extra gc. - bool should_compact = (delete_rows >= stable_rows * ratio_threshold) || (delete_bytes >= stable_bytes * ratio_threshold); - return should_compact; + return (delete_rows >= stable_rows * invalid_data_ratio_threshold) || (delete_bytes >= stable_bytes * invalid_data_ratio_threshold); +} + +std::unordered_set getDMFileIDs(const SegmentPtr & seg) +{ + std::unordered_set file_ids; + // We get the file ids in the segment no matter it is abandoned or not, + // because even it is abandoned, we don't know whether the new segment will ref the old dtfiles. + // So we just be conservative here to return the old file ids. + // This is ok because the next check will get the right file ids in such case. + if (seg) + { + const auto & dm_files = seg->getStable()->getDMFiles(); + for (const auto & file : dm_files) + { + file_ids.emplace(file->fileId()); + } + } + return file_ids; +} + +bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & context, // + const SegmentPtr & seg, + const SegmentSnapshotPtr & snap, + const SegmentPtr & prev_seg, + const SegmentPtr & next_seg, + double invalid_data_ratio_threshold, + const LoggerPtr & log) +{ + std::unordered_set prev_segment_file_ids = getDMFileIDs(prev_seg); + std::unordered_set next_segment_file_ids = getDMFileIDs(next_seg); + auto [first_pack_included, last_pack_included] = snap->stable->isFirstAndLastPackIncludedInRange(context, seg->getRowKeyRange()); + // Do a quick check about whether the DTFile is completely included in the segment range + if (first_pack_included && last_pack_included) + { + seg->setValidDataRatioChecked(); + return false; + } + bool contains_invalid_data = false; + const auto & dt_files = snap->stable->getDMFiles(); + if (!first_pack_included) + { + auto first_file_id = dt_files[0]->fileId(); + if (prev_segment_file_ids.count(first_file_id) == 0) + { + contains_invalid_data = true; + } + } + if (!last_pack_included) + { + auto last_file_id = dt_files[dt_files.size() - 1]->fileId(); + if (next_segment_file_ids.count(last_file_id) == 0) + { + contains_invalid_data = true; + } + } + // Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments. + if (!contains_invalid_data) + { + return false; + } + + size_t total_rows = 0; + size_t total_bytes = 0; + for (const auto & file : dt_files) + { + total_rows += file->getRows(); + total_bytes += file->getBytes(); + } + auto valid_rows = snap->stable->getRows(); + auto valid_bytes = snap->stable->getBytes(); + + LOG_FMT_TRACE(log, "valid_rows [{}], valid_bytes [{}] total_rows [{}] total_bytes [{}]", valid_rows, valid_bytes, total_rows, total_bytes); + seg->setValidDataRatioChecked(); + return (valid_rows < total_rows * (1 - invalid_data_ratio_threshold)) || (valid_bytes < total_bytes * (1 - invalid_data_ratio_threshold)); } } // namespace GC @@ -333,6 +427,8 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) auto dm_context = newDMContext(global_context, global_context.getSettingsRef(), "onSyncGc"); SegmentPtr segment; + SegmentPtr prev_segment = nullptr; + SegmentPtr next_segment = nullptr; SegmentSnapshotPtr segment_snap; { std::shared_lock lock(read_write_mutex); @@ -349,6 +445,16 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) segment = segment_it->second; next_gc_check_key = segment_it->first.toRowKeyValue(); segment_snap = segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + auto next_segment_it = next(segment_it, 1); + if (next_segment_it != segments.end()) + { + next_segment = next_segment_it->second; + } + if (segment_it != segments.begin()) + { + auto prev_segment_it = prev(segment_it, 1); + prev_segment = prev_segment_it->second; + } } assert(segment != nullptr); @@ -370,17 +476,36 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) try { // Check whether we should apply gc on this segment + auto invalid_data_ratio_threshold = global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc; + RUNTIME_ASSERT(invalid_data_ratio_threshold >= 0 && invalid_data_ratio_threshold <= 1); bool should_compact = false; + GC::Type gc_type = GC::Type::Unknown; if (GC::shouldCompactDeltaWithStable( *dm_context, segment_snap, segment_range, - global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc, + invalid_data_ratio_threshold, log)) { should_compact = true; + gc_type = GC::Type::TooManyDeleteRange; } - else if (segment->getLastCheckGCSafePoint() < gc_safe_point) + else if (!segment->isValidDataRatioChecked()) + { + if (GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange( + *dm_context, + segment, + segment_snap, + prev_segment, + next_segment, + invalid_data_ratio_threshold, + log)) + { + should_compact = true; + gc_type = GC::Type::TooMuchOutOfRange; + } + } + else if (!should_compact && (segment->getLastCheckGCSafePoint() < gc_safe_point)) { // Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not. // Because after we calculate StableProperty and compare it with this gc_safe_point, @@ -394,11 +519,15 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) if (!segment->getStable()->isStablePropertyCached()) segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle()); - should_compact = GC::shouldCompactStable( - segment, - gc_safe_point, - global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc, - log); + if (GC::shouldCompactStableWithTooManyInvalidVersion( + segment, + gc_safe_point, + global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc, + log)) + { + should_compact = true; + gc_type = GC::Type::TooManyInvalidVersion; + } } bool finish_gc_on_segment = false; if (should_compact) @@ -412,17 +541,19 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) finish_gc_on_segment = true; LOG_FMT_DEBUG( log, - "Finish GC-merge-delta, segment={} table={}", + "Finish GC-merge-delta, segment={} table={}, gc_type={}", segment->simpleInfo(), - table_name); + table_name, + GC::toString(gc_type)); } else { LOG_FMT_DEBUG( log, - "GC aborted, segment={} table={}", + "GC aborted, segment={} table={}, gc_type={}", segment->simpleInfo(), - table_name); + table_name, + GC::toString(gc_type)); } } if (!finish_gc_on_segment) diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index aaf0755025f..4005c0fa431 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -311,6 +311,9 @@ class Segment : private boost::noncopyable bool isSplitForbidden() const { return split_forbidden; } void forbidSplit() { split_forbidden = true; } + bool isValidDataRatioChecked() const { return check_valid_data_ratio.load(std::memory_order_relaxed); } + void setValidDataRatioChecked() { check_valid_data_ratio.store(true, std::memory_order_relaxed); } + void drop(const FileProviderPtr & file_provider, WriteBatches & wbs); bool isFlushing() const { return delta->isFlushing(); } @@ -424,6 +427,11 @@ class Segment : private boost::noncopyable const StableValueSpacePtr stable; bool split_forbidden = false; + // After logical split, it is very possible that only half of the data in the segment's DTFile is valid for this segment. + // So we want to do merge delta on this kind of segment to clean out the invalid data. + // This involves to check the valid data ratio in the background gc thread, + // and to avoid doing this check repeatedly, we add this flag to indicate whether the valid data ratio has already been checked. + std::atomic check_valid_data_ratio = false; LoggerPtr log; }; diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 9827655a265..2dc338b557e 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -397,5 +397,39 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & return {approx_rows, approx_bytes}; } +std::pair StableValueSpace::Snapshot::isFirstAndLastPackIncludedInRange(const DMContext & context, const RowKeyRange & range) const +{ + // Usually, this method will be called for some "cold" key ranges. + // Loading the index into cache may pollute the cache and make the hot index cache invalid. + // So don't refill the cache if the index does not exist. + bool first_pack_included = false; + bool last_pack_included = false; + for (size_t i = 0; i < stable->files.size(); i++) + { + const auto & file = stable->files[i]; + auto filter = DMFilePackFilter::loadFrom( + file, + context.db_context.getGlobalContext().getMinMaxIndexCache(), + /*set_cache_if_miss*/ false, + {range}, + RSOperatorPtr{}, + IdSetPtr{}, + context.db_context.getFileProvider(), + context.getReadLimiter(), + context.tracing_id); + const auto & use_packs = filter.getUsePacks(); + if (i == 0) + { + first_pack_included = use_packs[0]; + } + if (i == stable->files.size() - 1) + { + last_pack_included = use_packs[use_packs.size() - 1]; + } + } + + return std::make_pair(first_pack_included, last_pack_included); +} + } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index 1ab6449dc01..f9006058e7a 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -156,6 +156,8 @@ class StableValueSpace : public std::enable_shared_from_this RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const; + std::pair isFirstAndLastPackIncludedInRange(const DMContext & context, const RowKeyRange & range) const; + private: Poco::Logger * log; }; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 1714eb7b62e..3a776e82ad3 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -24,9 +24,13 @@ #include -namespace DB +namespace CurrentMetrics { +extern const Metric DT_SnapshotOfDeltaMerge; +} // namespace CurrentMetrics +namespace DB +{ namespace FailPoints { extern const char try_segment_logical_split[]; @@ -35,6 +39,16 @@ extern const char force_segment_logical_split[]; namespace DM { +namespace GC +{ +bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & context, // + const SegmentPtr & seg, + const SegmentSnapshotPtr & snap, + const SegmentPtr & prev_seg, + const SegmentPtr & next_seg, + double invalid_data_ratio_threshold, + const LoggerPtr & log); +} namespace tests { class SegmentOperationTest : public SegmentTestBasic @@ -432,6 +446,71 @@ try } CATCH +TEST_F(SegmentOperationTest, GCCheckAfterSegmentLogicalSplit) +try +{ + { + SegmentTestOptions options; + options.db_settings.dt_segment_stable_pack_rows = 100; + reloadWithOptions(options); + } + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 1000); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + auto invalid_data_ratio_threshold = dm_context->db_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc; + { + auto segment = segments[DELTA_MERGE_FIRST_SEGMENT_ID]; + auto snap = segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, segment, snap, /* prev_seg */ nullptr, /* next_seg */ nullptr, invalid_data_ratio_threshold, log)); + } + + FailPointHelper::enableFailPoint(FailPoints::force_segment_logical_split); + auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(new_seg_id_opt.has_value()); + auto left_segment_id = DELTA_MERGE_FIRST_SEGMENT_ID; + auto right_segment_id = new_seg_id_opt.value(); + { + auto left_segment = segments[left_segment_id]; + auto right_segment = segments[right_segment_id]; + auto left_snap = left_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + auto right_snap = right_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, left_segment, left_snap, /* prev_seg */ nullptr, /* next_seg */ right_segment, invalid_data_ratio_threshold, log)); + ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, right_segment, right_snap, /* prev_seg */ left_segment, /* next_seg */ nullptr, invalid_data_ratio_threshold, log)); + } + + FailPointHelper::enableFailPoint(FailPoints::force_segment_logical_split); + auto new_seg_id_opt2 = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(new_seg_id_opt2.has_value()); + auto middle_segment_id = new_seg_id_opt2.value(); + { + auto left_segment = segments[left_segment_id]; + auto middle_segment = segments[middle_segment_id]; + auto right_segment = segments[right_segment_id]; + auto left_snap = left_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + auto middle_snap = middle_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + auto right_snap = right_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, left_segment, left_snap, /* prev_seg */ nullptr, /* next_seg */ middle_segment, invalid_data_ratio_threshold, log)); + ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, middle_segment, middle_snap, /* prev_seg */ left_segment, /* next_seg */ right_segment, invalid_data_ratio_threshold, log)); + ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, right_segment, right_snap, /* prev_seg */ middle_segment, /* next_seg */ nullptr, invalid_data_ratio_threshold, log)); + } + + // merge delta left segment and check again + mergeSegmentDelta(left_segment_id); + { + auto left_segment = segments[left_segment_id]; + auto middle_segment = segments[middle_segment_id]; + auto right_segment = segments[right_segment_id]; + auto left_snap = left_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + auto middle_snap = middle_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + auto right_snap = right_segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, left_segment, left_snap, /* prev_seg */ nullptr, /* next_seg */ middle_segment, invalid_data_ratio_threshold, log)); + ASSERT_TRUE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, middle_segment, middle_snap, /* prev_seg */ left_segment, /* next_seg */ right_segment, invalid_data_ratio_threshold, log)); + ASSERT_FALSE(GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(*dm_context, right_segment, right_snap, /* prev_seg */ middle_segment, /* next_seg */ nullptr, invalid_data_ratio_threshold, log)); + } +} +CATCH + TEST_F(SegmentOperationTest, Issue5570) try {