Skip to content

Commit

Permalink
storage: require lock for segment operations (#5894)
Browse files Browse the repository at this point in the history
ref #5237
  • Loading branch information
breezewish authored Sep 15, 2022
1 parent f07fe50 commit 0d0cde5
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 17 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,11 @@ class DeltaMergeStore : private boost::noncopyable
bool handleBackgroundTask(bool heavy);

// isSegmentValid should be protected by lock on `read_write_mutex`
inline bool isSegmentValid(std::shared_lock<std::shared_mutex> &, const SegmentPtr & segment)
inline bool isSegmentValid(const std::shared_lock<std::shared_mutex> &, const SegmentPtr & segment)
{
return doIsSegmentValid(segment);
}
inline bool isSegmentValid(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment)
inline bool isSegmentValid(const std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment)
{
return doIsSegmentValid(segment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP

auto segment_lock = segment->mustGetUpdateLock();

std::tie(new_left, new_right) = segment->applySplit(dm_context, segment_snap, wbs, split_info);
std::tie(new_left, new_right) = segment->applySplit(segment_lock, dm_context, segment_snap, wbs, split_info);

wbs.writeMeta();

Expand Down Expand Up @@ -278,7 +278,7 @@ SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vect
for (const auto & seg : ordered_segments)
locks.emplace_back(seg->mustGetUpdateLock());

merged = Segment::applyMerge(dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable);
merged = Segment::applyMerge(locks, dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable);

wbs.writeMeta();

Expand Down Expand Up @@ -412,7 +412,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

auto segment_lock = segment->mustGetUpdateLock();

new_segment = segment->applyMergeDelta(dm_context, segment_snap, wbs, new_stable);
new_segment = segment->applyMergeDelta(segment_lock, dm_context, segment_snap, wbs, new_stable);

wbs.writeMeta();

Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr &
SYNC_FOR("before_Segment::applyMergeDelta"); // pause without holding the lock on the segment

auto lock = mustGetUpdateLock();
auto new_segment = applyMergeDelta(dm_context, segment_snap, wbs, new_stable);
auto new_segment = applyMergeDelta(lock, dm_context, segment_snap, wbs, new_stable);

wbs.writeAll();
return new_segment;
Expand Down Expand Up @@ -674,7 +674,8 @@ StableValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context,
return new_stable;
}

SegmentPtr Segment::applyMergeDelta(DMContext & context,
SegmentPtr Segment::applyMergeDelta(const Segment::Lock &, //
DMContext & context,
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs,
const StableValueSpacePtr & new_stable) const
Expand Down Expand Up @@ -731,7 +732,7 @@ SegmentPair Segment::split(DMContext & dm_context, const ColumnDefinesPtr & sche
SYNC_FOR("before_Segment::applySplit"); // pause without holding the lock on the segment

auto lock = mustGetUpdateLock();
auto segment_pair = applySplit(dm_context, segment_snap, wbs, split_info);
auto segment_pair = applySplit(lock, dm_context, segment_snap, wbs, split_info);

wbs.writeAll();

Expand Down Expand Up @@ -1141,7 +1142,8 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitPhysical(DMContext & dm_c
return {SplitInfo{false, split_point, my_new_stable, other_stable}};
}

SegmentPair Segment::applySplit(DMContext & dm_context, //
SegmentPair Segment::applySplit(const Segment::Lock &, //
DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs,
SplitInfo & split_info) const
Expand Down Expand Up @@ -1241,7 +1243,7 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem
for (const auto & seg : ordered_segments)
locks.emplace_back(seg->mustGetUpdateLock());

auto merged = applyMerge(dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable);
auto merged = applyMerge(locks, dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable);

wbs.writeAll();
return merged;
Expand Down Expand Up @@ -1326,7 +1328,8 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
return merged_stable;
}

SegmentPtr Segment::applyMerge(DMContext & dm_context, //
SegmentPtr Segment::applyMerge(const std::vector<Segment::Lock> &, //
DMContext & dm_context,
const std::vector<SegmentPtr> & ordered_segments,
const std::vector<SegmentSnapshotPtr> & ordered_snapshots,
WriteBatches & wbs,
Expand Down
15 changes: 14 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Segment : private boost::noncopyable
{
public:
using DeltaTree = DefaultDeltaTree;
using Lock = DeltaValueSpace::Lock;

struct ReadInfo
{
Expand Down Expand Up @@ -207,7 +208,11 @@ class Segment : private boost::noncopyable
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs) const;

/**
* Should be protected behind the Segment update lock.
*/
[[nodiscard]] SegmentPair applySplit(
const Lock &,
DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs,
Expand All @@ -229,7 +234,11 @@ class Segment : private boost::noncopyable
const std::vector<SegmentSnapshotPtr> & ordered_snapshots,
WriteBatches & wbs);

/**
* Should be protected behind the update lock for all related segments.
*/
[[nodiscard]] static SegmentPtr applyMerge(
const std::vector<Lock> &,
DMContext & dm_context,
const std::vector<SegmentPtr> & ordered_segments,
const std::vector<SegmentSnapshotPtr> & ordered_snapshots,
Expand All @@ -247,7 +256,12 @@ class Segment : private boost::noncopyable
const ColumnDefinesPtr & schema_snap,
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs) const;

/**
* Should be protected behind the Segment update lock.
*/
[[nodiscard]] SegmentPtr applyMergeDelta(
const Lock &,
DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs,
Expand Down Expand Up @@ -283,7 +297,6 @@ class Segment : private boost::noncopyable
static String simpleInfo(const std::vector<SegmentPtr> & segments);
static String info(const std::vector<SegmentPtr> & segments);

using Lock = DeltaValueSpace::Lock;
bool getUpdateLock(Lock & lock) const { return delta->getLock(lock); }

Lock mustGetUpdateLock() const
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ try
split_info->other_stable->enableDMFilesGC();

auto lock = segment->mustGetUpdateLock();
std::tie(segment, other_segment) = segment->applySplit(dmContext(), segment_snap, wbs, split_info.value());
std::tie(segment, other_segment) = segment->applySplit(lock, dmContext(), segment_snap, wbs, split_info.value());

wbs.writeAll();
}
Expand Down Expand Up @@ -1157,10 +1157,10 @@ try
wbs.writeLogAndData();
merged_stable->enableDMFilesGC();

auto left_lock = segment->mustGetUpdateLock();
auto right_lock = other_segment->mustGetUpdateLock();

segment = Segment::applyMerge(dmContext(), {segment, other_segment}, {left_snap, right_snap}, wbs, merged_stable);
std::vector<Segment::Lock> locks;
locks.emplace_back(segment->mustGetUpdateLock());
locks.emplace_back(other_segment->mustGetUpdateLock());
segment = Segment::applyMerge(locks, dmContext(), {segment, other_segment}, {left_snap, right_snap}, wbs, merged_stable);

wbs.writeAll();
}
Expand Down

0 comments on commit 0d0cde5

Please sign in to comment.