Skip to content

Commit

Permalink
This is an automated cherry-pick of #5410
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
breezewish authored and ti-chi-bot committed Jul 20, 2022
1 parent 432dc68 commit fba64c5
Showing 1 changed file with 47 additions and 7 deletions.
54 changes: 47 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,8 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings

void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range)
{
size_t sleep_ms = 5;

RowKeyRange cur_range = range;
while (!cur_range.none())
{
Expand All @@ -1004,11 +1006,24 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa
}
segment_range = segment->getRowKeyRange();

// Flush could fail.
if (segment->flushCache(*dm_context))
{
break;
}
<<<<<<< HEAD
=======
else if (!try_until_succeed)
{
return false;
}

// Flush could fail. Typical cases:
// #1. The segment is abandoned (due to an update is finished)
// #2. There is another flush in progress, for example, triggered in background
// Let's sleep 5ms ~ 100ms and then retry flush again.
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
sleep_ms = std::min(sleep_ms * 2, 100);
>>>>>>> 9ccfc4b3ea (Yield CPU for concurrent flush and concurrent mergeDelta (#5410))
}

cur_range.setStart(segment_range.end);
Expand Down Expand Up @@ -1041,6 +1056,8 @@ std::optional<DM::RowKeyRange> DeltaMergeStore::mergeDeltaBySegment(const Contex
auto dm_context = newDMContext(context, context.getSettingsRef(),
/*tracing_id*/ fmt::format("mergeDeltaBySegment_{}", latest_gc_safe_point.load(std::memory_order_relaxed)));

size_t sleep_ms = 50;

while (true)
{
SegmentPtr segment;
Expand All @@ -1054,18 +1071,41 @@ std::optional<DM::RowKeyRange> DeltaMergeStore::mergeDeltaBySegment(const Contex
segment = segment_it->second;
}

<<<<<<< HEAD
const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread);
if (new_segment)
{
const auto segment_end = new_segment->getRowKeyRange().end;
if (unlikely(*segment_end.value <= *start_key.value))
=======
if (segment->flushCache(*dm_context))
{
const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread);
if (new_segment)
>>>>>>> 9ccfc4b3ea (Yield CPU for concurrent flush and concurrent mergeDelta (#5410))
{
// The next start key must be > current start key
LOG_FMT_ERROR(log, "Assert new_segment.end {} > start {} failed", segment_end.toDebugString(), start_key.toDebugString());
throw Exception("Assert segment range failed", ErrorCodes::LOGICAL_ERROR);
}
return new_segment->getRowKeyRange();
}
const auto segment_end = new_segment->getRowKeyRange().end;
if (unlikely(*segment_end.value <= *start_key.value))
{
// The next start key must be > current start key
LOG_FMT_ERROR(log, "Assert new_segment.end {} > start {} failed", segment_end.toDebugString(), start_key.toDebugString());
throw Exception("Assert segment range failed", ErrorCodes::LOGICAL_ERROR);
}
return new_segment->getRowKeyRange();
} // else: sleep and retry
} // else: sleep and retry

// Typical cases:
// #1. flushCache failed
// - The segment is abandoned (due to segment updated)
// - There is another flush in progress (e.g. triggered in background)
// #2. segmentMergeDelta failed
// - The segment is abandoned (due to segment updated)
// - The segment is updating (e.g. a split-preparation is working, which occupies a for-write snapshot).
// It could be possible to take seconds to finish the segment updating, so let's sleep for a short time
// (50ms ~ 1000ms) and then retry.
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
sleep_ms = std::min(sleep_ms * 2, 1000);
}
}

Expand Down

0 comments on commit fba64c5

Please sign in to comment.