From 0d0cde5a1057144a058386fc31b94bcbf343e500 Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Thu, 15 Sep 2022 14:08:59 +0800 Subject: [PATCH] storage: require lock for segment operations (#5894) ref pingcap/tiflash#5237 --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.h | 4 ++-- .../DeltaMergeStore_InternalSegment.cpp | 6 +++--- dbms/src/Storages/DeltaMerge/Segment.cpp | 15 +++++++++------ dbms/src/Storages/DeltaMerge/Segment.h | 15 ++++++++++++++- .../DeltaMerge/tests/gtest_dm_segment.cpp | 10 +++++----- 5 files changed, 33 insertions(+), 17 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 7d4f9a6c1d9..acef24e0c42 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -512,11 +512,11 @@ class DeltaMergeStore : private boost::noncopyable bool handleBackgroundTask(bool heavy); // isSegmentValid should be protected by lock on `read_write_mutex` - inline bool isSegmentValid(std::shared_lock &, const SegmentPtr & segment) + inline bool isSegmentValid(const std::shared_lock &, const SegmentPtr & segment) { return doIsSegmentValid(segment); } - inline bool isSegmentValid(std::unique_lock &, const SegmentPtr & segment) + inline bool isSegmentValid(const std::unique_lock &, const SegmentPtr & segment) { return doIsSegmentValid(segment); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index 39a6e58d0c6..5beaf23b3bb 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -116,7 +116,7 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP auto segment_lock = segment->mustGetUpdateLock(); - std::tie(new_left, new_right) = segment->applySplit(dm_context, segment_snap, wbs, split_info); + std::tie(new_left, new_right) = segment->applySplit(segment_lock, dm_context, segment_snap, wbs, split_info); wbs.writeMeta(); @@ -278,7 +278,7 @@ SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vect for (const auto & seg : ordered_segments) locks.emplace_back(seg->mustGetUpdateLock()); - merged = Segment::applyMerge(dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable); + merged = Segment::applyMerge(locks, dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable); wbs.writeMeta(); @@ -412,7 +412,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( auto segment_lock = segment->mustGetUpdateLock(); - new_segment = segment->applyMergeDelta(dm_context, segment_snap, wbs, new_stable); + new_segment = segment->applyMergeDelta(segment_lock, dm_context, segment_snap, wbs, new_stable); wbs.writeMeta(); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index e567322a0e8..fdaef7f8401 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -640,7 +640,7 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & SYNC_FOR("before_Segment::applyMergeDelta"); // pause without holding the lock on the segment auto lock = mustGetUpdateLock(); - auto new_segment = applyMergeDelta(dm_context, segment_snap, wbs, new_stable); + auto new_segment = applyMergeDelta(lock, dm_context, segment_snap, wbs, new_stable); wbs.writeAll(); return new_segment; @@ -674,7 +674,8 @@ StableValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context, return new_stable; } -SegmentPtr Segment::applyMergeDelta(DMContext & context, +SegmentPtr Segment::applyMergeDelta(const Segment::Lock &, // + DMContext & context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs, const StableValueSpacePtr & new_stable) const @@ -731,7 +732,7 @@ SegmentPair Segment::split(DMContext & dm_context, const ColumnDefinesPtr & sche SYNC_FOR("before_Segment::applySplit"); // pause without holding the lock on the segment auto lock = mustGetUpdateLock(); - auto segment_pair = applySplit(dm_context, segment_snap, wbs, split_info); + auto segment_pair = applySplit(lock, dm_context, segment_snap, wbs, split_info); wbs.writeAll(); @@ -1141,7 +1142,8 @@ std::optional Segment::prepareSplitPhysical(DMContext & dm_c return {SplitInfo{false, split_point, my_new_stable, other_stable}}; } -SegmentPair Segment::applySplit(DMContext & dm_context, // +SegmentPair Segment::applySplit(const Segment::Lock &, // + DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs, SplitInfo & split_info) const @@ -1241,7 +1243,7 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem for (const auto & seg : ordered_segments) locks.emplace_back(seg->mustGetUpdateLock()); - auto merged = applyMerge(dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable); + auto merged = applyMerge(locks, dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable); wbs.writeAll(); return merged; @@ -1326,7 +1328,8 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // return merged_stable; } -SegmentPtr Segment::applyMerge(DMContext & dm_context, // +SegmentPtr Segment::applyMerge(const std::vector &, // + DMContext & dm_context, const std::vector & ordered_segments, const std::vector & ordered_snapshots, WriteBatches & wbs, diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 4005c0fa431..422bb87b125 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -69,6 +69,7 @@ class Segment : private boost::noncopyable { public: using DeltaTree = DefaultDeltaTree; + using Lock = DeltaValueSpace::Lock; struct ReadInfo { @@ -207,7 +208,11 @@ class Segment : private boost::noncopyable const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs) const; + /** + * Should be protected behind the Segment update lock. + */ [[nodiscard]] SegmentPair applySplit( + const Lock &, DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs, @@ -229,7 +234,11 @@ class Segment : private boost::noncopyable const std::vector & ordered_snapshots, WriteBatches & wbs); + /** + * Should be protected behind the update lock for all related segments. + */ [[nodiscard]] static SegmentPtr applyMerge( + const std::vector &, DMContext & dm_context, const std::vector & ordered_segments, const std::vector & ordered_snapshots, @@ -247,7 +256,12 @@ class Segment : private boost::noncopyable const ColumnDefinesPtr & schema_snap, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs) const; + + /** + * Should be protected behind the Segment update lock. + */ [[nodiscard]] SegmentPtr applyMergeDelta( + const Lock &, DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs, @@ -283,7 +297,6 @@ class Segment : private boost::noncopyable static String simpleInfo(const std::vector & segments); static String info(const std::vector & segments); - using Lock = DeltaValueSpace::Lock; bool getUpdateLock(Lock & lock) const { return delta->getLock(lock); } Lock mustGetUpdateLock() const diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 79ae30298ff..b5de675d2ee 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1128,7 +1128,7 @@ try split_info->other_stable->enableDMFilesGC(); auto lock = segment->mustGetUpdateLock(); - std::tie(segment, other_segment) = segment->applySplit(dmContext(), segment_snap, wbs, split_info.value()); + std::tie(segment, other_segment) = segment->applySplit(lock, dmContext(), segment_snap, wbs, split_info.value()); wbs.writeAll(); } @@ -1157,10 +1157,10 @@ try wbs.writeLogAndData(); merged_stable->enableDMFilesGC(); - auto left_lock = segment->mustGetUpdateLock(); - auto right_lock = other_segment->mustGetUpdateLock(); - - segment = Segment::applyMerge(dmContext(), {segment, other_segment}, {left_snap, right_snap}, wbs, merged_stable); + std::vector locks; + locks.emplace_back(segment->mustGetUpdateLock()); + locks.emplace_back(other_segment->mustGetUpdateLock()); + segment = Segment::applyMerge(locks, dmContext(), {segment, other_segment}, {left_snap, right_snap}, wbs, merged_stable); wbs.writeAll(); }