From d7f08777d1115ff9af6ba00316b76b4c95f2766d Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Wed, 20 Jul 2022 13:47:08 +0800 Subject: [PATCH] This is an automated cherry-pick of #5410 Signed-off-by: ti-chi-bot --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 71 ++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 9f2b43d75e4..c208845c061 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -853,6 +853,8 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range) { + size_t sleep_ms = 5; + RowKeyRange cur_range = range; while (!cur_range.none()) { @@ -876,11 +878,24 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa } segment_range = segment->getRowKeyRange(); - // Flush could fail. if (segment->flushCache(*dm_context)) { break; } +<<<<<<< HEAD +======= + else if (!try_until_succeed) + { + return false; + } + + // Flush could fail. Typical cases: + // #1. The segment is abandoned (due to an update is finished) + // #2. There is another flush in progress, for example, triggered in background + // Let's sleep 5ms ~ 100ms and then retry flush again. + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + sleep_ms = std::min(sleep_ms * 2, 100); +>>>>>>> 9ccfc4b3ea (Yield CPU for concurrent flush and concurrent mergeDelta (#5410)) } cur_range.setStart(segment_range.end); @@ -907,6 +922,60 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context) } } +<<<<<<< HEAD +======= +std::optional DeltaMergeStore::mergeDeltaBySegment(const Context & context, const RowKeyValue & start_key, const TaskRunThread run_thread) +{ + updateGCSafePoint(); + auto dm_context = newDMContext(context, context.getSettingsRef(), + /*tracing_id*/ fmt::format("mergeDeltaBySegment_{}", latest_gc_safe_point.load(std::memory_order_relaxed))); + + size_t sleep_ms = 50; + + while (true) + { + SegmentPtr segment; + { + std::shared_lock lock(read_write_mutex); + const auto segment_it = segments.upper_bound(start_key.toRowKeyValueRef()); + if (segment_it == segments.end()) + { + return std::nullopt; + } + segment = segment_it->second; + } + + if (segment->flushCache(*dm_context)) + { + const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread); + if (new_segment) + { + const auto segment_end = new_segment->getRowKeyRange().end; + if (unlikely(*segment_end.value <= *start_key.value)) + { + // The next start key must be > current start key + LOG_FMT_ERROR(log, "Assert new_segment.end {} > start {} failed", segment_end.toDebugString(), start_key.toDebugString()); + throw Exception("Assert segment range failed", ErrorCodes::LOGICAL_ERROR); + } + return new_segment->getRowKeyRange(); + } // else: sleep and retry + } // else: sleep and retry + + // Typical cases: + // #1. flushCache failed + // - The segment is abandoned (due to segment updated) + // - There is another flush in progress (e.g. triggered in background) + // #2. segmentMergeDelta failed + // - The segment is abandoned (due to segment updated) + // - The segment is updating (e.g. a split-preparation is working, which occupies a for-write snapshot). + // It could be possible to take seconds to finish the segment updating, so let's sleep for a short time + // (50ms ~ 1000ms) and then retry. + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + sleep_ms = std::min(sleep_ms * 2, 1000); + } +} + +>>>>>>> 9ccfc4b3ea (Yield CPU for concurrent flush and concurrent mergeDelta (#5410)) void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & range) { auto dm_context = newDMContext(db_context, db_context.getSettingsRef());