diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index a74404f3dbb..1a479273088 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -981,6 +981,8 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range) { + size_t sleep_ms = 5; + RowKeyRange cur_range = range; while (!cur_range.none()) { @@ -1004,11 +1006,24 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa } segment_range = segment->getRowKeyRange(); - // Flush could fail. if (segment->flushCache(*dm_context)) { break; } +<<<<<<< HEAD +======= + else if (!try_until_succeed) + { + return false; + } + + // Flush could fail. Typical cases: + // #1. The segment is abandoned (due to an update is finished) + // #2. There is another flush in progress, for example, triggered in background + // Let's sleep 5ms ~ 100ms and then retry flush again. + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + sleep_ms = std::min(sleep_ms * 2, 100); +>>>>>>> 9ccfc4b3ea (Yield CPU for concurrent flush and concurrent mergeDelta (#5410)) } cur_range.setStart(segment_range.end); @@ -1041,6 +1056,8 @@ std::optional DeltaMergeStore::mergeDeltaBySegment(const Contex auto dm_context = newDMContext(context, context.getSettingsRef(), /*tracing_id*/ fmt::format("mergeDeltaBySegment_{}", latest_gc_safe_point.load(std::memory_order_relaxed))); + size_t sleep_ms = 50; + while (true) { SegmentPtr segment; @@ -1054,18 +1071,41 @@ std::optional DeltaMergeStore::mergeDeltaBySegment(const Contex segment = segment_it->second; } +<<<<<<< HEAD const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread); if (new_segment) { const auto segment_end = new_segment->getRowKeyRange().end; if (unlikely(*segment_end.value <= *start_key.value)) +======= + if (segment->flushCache(*dm_context)) + { + const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread); + if (new_segment) +>>>>>>> 9ccfc4b3ea (Yield CPU for concurrent flush and concurrent mergeDelta (#5410)) { - // The next start key must be > current start key - LOG_FMT_ERROR(log, "Assert new_segment.end {} > start {} failed", segment_end.toDebugString(), start_key.toDebugString()); - throw Exception("Assert segment range failed", ErrorCodes::LOGICAL_ERROR); - } - return new_segment->getRowKeyRange(); - } + const auto segment_end = new_segment->getRowKeyRange().end; + if (unlikely(*segment_end.value <= *start_key.value)) + { + // The next start key must be > current start key + LOG_FMT_ERROR(log, "Assert new_segment.end {} > start {} failed", segment_end.toDebugString(), start_key.toDebugString()); + throw Exception("Assert segment range failed", ErrorCodes::LOGICAL_ERROR); + } + return new_segment->getRowKeyRange(); + } // else: sleep and retry + } // else: sleep and retry + + // Typical cases: + // #1. flushCache failed + // - The segment is abandoned (due to segment updated) + // - There is another flush in progress (e.g. triggered in background) + // #2. segmentMergeDelta failed + // - The segment is abandoned (due to segment updated) + // - The segment is updating (e.g. a split-preparation is working, which occupies a for-write snapshot). + // It could be possible to take seconds to finish the segment updating, so let's sleep for a short time + // (50ms ~ 1000ms) and then retry. + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + sleep_ms = std::min(sleep_ms * 2, 1000); } }