From af90192f5dde9d05196ea6532dfdb048134d386f Mon Sep 17 00:00:00 2001 From: Wish Date: Thu, 18 Aug 2022 01:09:59 +0800 Subject: [PATCH] Yield CPU for concurrent flush and concurrent mergeDelta (#5410) Signed-off-by: Wish --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 58 +++++++++++++------ 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 01747e8e1fe..bcbc1110494 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -981,6 +981,8 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range) { + size_t sleep_ms = 5; + RowKeyRange cur_range = range; while (!cur_range.none()) { @@ -1004,11 +1006,21 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa } segment_range = segment->getRowKeyRange(); - // Flush could fail. if (segment->flushCache(*dm_context)) { break; } + else if (!try_until_succeed) + { + return false; + } + + // Flush could fail. Typical cases: + // #1. The segment is abandoned (due to an update is finished) + // #2. There is another flush in progress, for example, triggered in background + // Let's sleep 5ms ~ 100ms and then retry flush again. + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + sleep_ms = std::min(sleep_ms * 2, 100); } cur_range.setStart(segment_range.end); @@ -1041,6 +1053,8 @@ std::optional DeltaMergeStore::mergeDeltaBySegment(const Contex auto dm_context = newDMContext(context, context.getSettingsRef(), /*tracing_id*/ fmt::format("mergeDeltaBySegment_{}", latest_gc_safe_point.load(std::memory_order_relaxed))); + size_t sleep_ms = 50; + while (true) { SegmentPtr segment; @@ -1054,25 +1068,33 @@ std::optional DeltaMergeStore::mergeDeltaBySegment(const Contex segment = segment_it->second; } - if (!segment->flushCache(*dm_context)) + if (segment->flushCache(*dm_context)) { - // If the flush failed, it means there are parallel updates to the segment in the background. - // In this case, we try again. - continue; - } - - const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread); - if (new_segment) - { - const auto segment_end = new_segment->getRowKeyRange().end; - if (unlikely(*segment_end.value <= *start_key.value)) + const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread); + if (new_segment) { - // The next start key must be > current start key - LOG_FMT_ERROR(log, "Assert new_segment.end {} > start {} failed", segment_end.toDebugString(), start_key.toDebugString()); - throw Exception("Assert segment range failed", ErrorCodes::LOGICAL_ERROR); - } - return new_segment->getRowKeyRange(); - } + const auto segment_end = new_segment->getRowKeyRange().end; + if (unlikely(*segment_end.value <= *start_key.value)) + { + // The next start key must be > current start key + LOG_FMT_ERROR(log, "Assert new_segment.end {} > start {} failed", segment_end.toDebugString(), start_key.toDebugString()); + throw Exception("Assert segment range failed", ErrorCodes::LOGICAL_ERROR); + } + return new_segment->getRowKeyRange(); + } // else: sleep and retry + } // else: sleep and retry + + // Typical cases: + // #1. flushCache failed + // - The segment is abandoned (due to segment updated) + // - There is another flush in progress (e.g. triggered in background) + // #2. segmentMergeDelta failed + // - The segment is abandoned (due to segment updated) + // - The segment is updating (e.g. a split-preparation is working, which occupies a for-write snapshot). + // It could be possible to take seconds to finish the segment updating, so let's sleep for a short time + // (50ms ~ 1000ms) and then retry. + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + sleep_ms = std::min(sleep_ms * 2, 1000); } }