Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Merge multiple segments in gc thread #5863

Merged
merged 27 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0bfd061
move ingest out
breezewish Sep 9, 2022
9d52cdc
move internal segment ops out
breezewish Sep 9, 2022
72b9c28
Re-format
breezewish Sep 9, 2022
7de2a21
Avoid including DeltaMergeStore.h
breezewish Sep 9, 2022
366cb81
Fix (1)
breezewish Sep 9, 2022
d720dee
Merge remote-tracking branch 'origin/master' into wenxuan/dm_pimpl
breezewish Sep 9, 2022
47ea426
wip
breezewish Sep 12, 2022
2243b0e
grumble
breezewish Sep 12, 2022
31c8e63
protect against non-insertable segment ranges
breezewish Sep 12, 2022
56b9a43
Add unit tests for segment merge
breezewish Sep 12, 2022
6d5ff10
Reformat
breezewish Sep 12, 2022
a9008d1
Merge remote-tracking branch 'origin/master' into wenxuan/merge_multiple
breezewish Sep 12, 2022
9a97537
Fix issues reported by static analysis
breezewish Sep 13, 2022
6b05d0e
Merge branch 'master' into wenxuan/merge_multiple
JaySon-Huang Sep 13, 2022
9b4f09e
Address comments
breezewish Sep 13, 2022
4f6235e
refactor: Move storage bg out
breezewish Sep 13, 2022
e774078
Merge remote-tracking branch 'origin/master' into wenxuan/split_bg
breezewish Sep 13, 2022
d1f7d25
storage: merge in gc thread
breezewish Sep 13, 2022
3aa728c
Merge remote-tracking branch 'origin/master' into wenxuan/merge_in_gc
breezewish Sep 13, 2022
daa0eb8
Add tests
breezewish Sep 13, 2022
1466932
Merge remote-tracking branch 'origin/master' into wenxuan/merge_in_gc
breezewish Sep 14, 2022
69cd880
Verify the snapshot and the segment as usual
breezewish Sep 15, 2022
436a61c
Address comments
breezewish Sep 15, 2022
e37ff2d
Merge branch 'master' into wenxuan/merge_in_gc
lidezhu Sep 15, 2022
4b53fea
Merge branch 'master' into wenxuan/merge_in_gc
breezewish Sep 15, 2022
df80965
Merge remote-tracking branch 'origin/master' into wenxuan/merge_in_gc
breezewish Sep 15, 2022
e4ffe56
Merge branch 'master' into wenxuan/merge_in_gc
ti-chi-bot Sep 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f

#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
M(gc_skip_update_safe_point) \
M(force_set_page_file_write_errno) \
M(force_split_io_size_4k) \
M(minimum_block_size_for_cross_join) \
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ struct DMContext : private boost::noncopyable
const size_t delta_small_column_file_bytes;
// The expected stable pack rows.
const size_t stable_pack_rows;
// The rows of segment to be regarded as small. Small segments will be merged.
const size_t small_segment_rows;
// The bytes of segment to be regarded as small. Small segments will be merged.
const size_t small_segment_bytes;

// The number of points to check for calculating region split.
const size_t region_split_check_points = 128;
Expand Down Expand Up @@ -111,6 +115,8 @@ struct DMContext : private boost::noncopyable
, delta_small_column_file_rows(settings.dt_segment_delta_small_column_file_rows)
, delta_small_column_file_bytes(settings.dt_segment_delta_small_column_file_size)
, stable_pack_rows(settings.dt_segment_stable_pack_rows)
, small_segment_rows(settings.dt_segment_limit_rows / 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the previous limit of merge threshold is settings.dt_segment_limit_rows / 3 + settings.dt_segment_limit_rows / 5 which is about settings.dt_segment_limit_rows / 2, is this change intentional?

Copy link
Member Author

@breezewish breezewish Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In my ingestion test small sized segments (around 100MiB) is 25% faster than normal sized segments. /3 will keep these small sized segments not merged together. However I would admit that controlling the performance using this parameter is a bit hack before we are clear why small sized segments is so much faster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is really interesting. We definitely should find out why smaller segments help performance. settings.dt_segment_limit_rows / 3 looks good to me now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is smaller segments more suitable for read thread pool? @JinheLin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing back to /2 (as changing back would not bring performance regressions, just be the same as previous segment structure) to be more conservative? Then, we can have sufficient time to study what is really causing it to be fast(er)?

, small_segment_bytes(settings.dt_segment_limit_size / 3)
, enable_logical_split(settings.dt_enable_logical_split)
, read_delta_only(settings.dt_read_delta_only)
, read_stable_only(settings.dt_read_stable_only)
Expand Down
56 changes: 6 additions & 50 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ std::pair<bool, bool> DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const Back
switch (task.type)
{
case TaskType::Split:
case TaskType::Merge:
case TaskType::MergeDelta:
is_heavy = true;
// reserve some task space for light tasks
Expand Down Expand Up @@ -1178,8 +1177,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
|| delta_bytes - delta_last_try_split_bytes >= delta_cache_limit_bytes))
|| (segment_rows >= segment_limit_rows * 3 || segment_bytes >= segment_limit_bytes * 3);

bool should_merge = segment_rows < segment_limit_rows / 3 && segment_bytes < segment_limit_bytes / 3;

// Don't do compact on starting up.
bool should_compact = (thread_type != ThreadType::Init) && std::max(static_cast<Int64>(column_file_count) - delta_last_try_compact_column_files, 0) >= 10;

Expand Down Expand Up @@ -1236,7 +1233,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
{
delta_last_try_flush_rows = delta_rows;
delta_last_try_flush_bytes = delta_bytes;
try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}});
try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment});
}
}
}
Expand All @@ -1246,36 +1243,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
if (segment->getDelta()->isUpdating())
return;

/// Now start trying structure update.

auto get_merge_sibling = [&]() -> SegmentPtr {
/// For complexity reason, currently we only try to merge with next segment. Normally it is good enough.

// The last segment cannot be merged.
if (segment->getRowKeyRange().isEndInfinite())
return {};
SegmentPtr next_segment;
{
std::shared_lock read_write_lock(read_write_mutex);

auto it = segments.find(segment->getRowKeyRange().getEnd());
// check legality
if (it == segments.end())
return {};
auto & cur_segment = it->second;
if (cur_segment.get() != segment.get())
return {};
++it;
if (it == segments.end())
return {};
next_segment = it->second;
auto limit = dm_context->segment_limit_rows / 5;
if (next_segment->getEstimatedRows() >= limit)
return {};
}
return next_segment;
};

auto try_fg_merge_delta = [&]() -> SegmentPtr {
// If the table is already dropped, don't trigger foreground merge delta when executing `remove region peer`,
// or the raft-log apply threads may be blocked.
Expand Down Expand Up @@ -1303,7 +1270,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
if (should_background_merge_delta)
{
delta_last_try_merge_delta_rows = delta_rows;
try_add_background_task(BackgroundTask{TaskType::MergeDelta, dm_context, segment, {}});
try_add_background_task(BackgroundTask{TaskType::MergeDelta, dm_context, segment});
return true;
}
return false;
Expand All @@ -1313,12 +1280,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
{
delta_last_try_split_rows = delta_rows;
delta_last_try_split_bytes = delta_bytes;
try_add_background_task(BackgroundTask{TaskType::Split, dm_context, seg, {}});
try_add_background_task(BackgroundTask{TaskType::Split, dm_context, seg});
return true;
}
return false;
};
auto try_fg_split = [&](const SegmentPtr & my_segment) -> bool {
auto try_fg_split = [&](const SegmentPtr & my_segment) {
auto my_segment_size = my_segment->getEstimatedBytes();
auto my_should_split = my_segment_size >= dm_context->segment_force_split_bytes;
if (my_should_split && !my_segment->isSplitForbidden())
Expand All @@ -1334,15 +1301,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
}
return false;
};
auto try_bg_merge = [&]() {
SegmentPtr merge_sibling;
if (should_merge && (merge_sibling = get_merge_sibling()))
{
try_add_background_task(BackgroundTask{TaskType::Merge, dm_context, segment, merge_sibling});
return true;
}
return false;
};
auto try_bg_compact = [&]() {
/// Compact task should be a really low priority task.
/// And if the segment is flushing,
Expand All @@ -1352,7 +1310,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
if (should_compact && !segment->isFlushing())
{
delta_last_try_compact_column_files = column_file_count;
try_add_background_task(BackgroundTask{TaskType::Compact, dm_context, segment, {}});
try_add_background_task(BackgroundTask{TaskType::Compact, dm_context, segment});
return true;
}
return false;
Expand All @@ -1361,7 +1319,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
if (should_place_delta_index)
{
delta_last_try_place_delta_index_rows = delta_rows;
try_add_background_task(BackgroundTask{TaskType::PlaceIndex, dm_context, segment, {}});
try_add_background_task(BackgroundTask{TaskType::PlaceIndex, dm_context, segment});
return true;
}
return false;
Expand Down Expand Up @@ -1406,8 +1364,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
if (try_bg_merge_delta())
return;
}
if (try_bg_merge())
return;
if (try_bg_compact())
return;
if (try_place_delta_index())
Expand Down
25 changes: 21 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ class DeltaMergeStore : private boost::noncopyable
enum TaskType
{
Split,
Merge,
MergeDelta,
Compact,
Flush,
Expand Down Expand Up @@ -221,8 +220,6 @@ class DeltaMergeStore : private boost::noncopyable
{
case Split:
return "Split";
case Merge:
return "Merge";
case MergeDelta:
return "MergeDelta";
case Compact:
Expand Down Expand Up @@ -259,7 +256,6 @@ class DeltaMergeStore : private boost::noncopyable

DMContextPtr dm_context;
SegmentPtr segment;
SegmentPtr next_segment;

explicit operator bool() const { return segment != nullptr; }
};
Expand Down Expand Up @@ -405,6 +401,27 @@ class DeltaMergeStore : private boost::noncopyable
/// Iterator over all segments and apply gc jobs.
UInt64 onSyncGc(Int64 limit);

/**
* Try to merge the segment in the current thread as the GC operation.
* This function may be blocking, and should be called in the GC background thread.
*/
SegmentPtr gcTrySegmentMerge(const DMContextPtr & dm_context, const SegmentPtr & segment);

/**
* Try to merge delta in the current thread as the GC operation.
* This function may be blocking, and should be called in the GC background thread.
*/
SegmentPtr gcTrySegmentMergeDelta(const DMContextPtr & dm_context, const SegmentPtr & segment, const SegmentPtr & prev_segment, const SegmentPtr & next_segment, DB::Timestamp gc_safe_point);

/**
* Starting from the given base segment, find continuous segments that could be merged.
*
* When there are mergeable segments, the baseSegment is returned in index 0 and mergeable segments are then placed in order.
* It is ensured that there are at least 2 elements in the returned vector.
* When there is no mergeable segment, the returned vector will be empty.
*/
std::vector<SegmentPtr> getMergeableSegments(const DMContextPtr & context, const SegmentPtr & baseSegment);

/// Apply DDL `commands` on `table_columns`
void applyAlters(const AlterCommands & commands, //
OptionTableInfoConstRef table_info,
Expand Down
Loading