Skip to content

Commit

Permalink
Yield CPU for concurrent flush and concurrent mergeDelta (#5410)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <[email protected]>
  • Loading branch information
breezewish committed Aug 17, 2022
1 parent 6feacc2 commit af90192
Showing 1 changed file with 40 additions and 18 deletions.
58 changes: 40 additions & 18 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,8 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings

void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range)
{
size_t sleep_ms = 5;

RowKeyRange cur_range = range;
while (!cur_range.none())
{
Expand All @@ -1004,11 +1006,21 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa
}
segment_range = segment->getRowKeyRange();

// Flush could fail.
if (segment->flushCache(*dm_context))
{
break;
}
else if (!try_until_succeed)
{
return false;
}

// Flush could fail. Typical cases:
// #1. The segment is abandoned (due to an update is finished)
// #2. There is another flush in progress, for example, triggered in background
// Let's sleep 5ms ~ 100ms and then retry flush again.
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
sleep_ms = std::min(sleep_ms * 2, 100);
}

cur_range.setStart(segment_range.end);
Expand Down Expand Up @@ -1041,6 +1053,8 @@ std::optional<DM::RowKeyRange> DeltaMergeStore::mergeDeltaBySegment(const Contex
auto dm_context = newDMContext(context, context.getSettingsRef(),
/*tracing_id*/ fmt::format("mergeDeltaBySegment_{}", latest_gc_safe_point.load(std::memory_order_relaxed)));

size_t sleep_ms = 50;

while (true)
{
SegmentPtr segment;
Expand All @@ -1054,25 +1068,33 @@ std::optional<DM::RowKeyRange> DeltaMergeStore::mergeDeltaBySegment(const Contex
segment = segment_it->second;
}

if (!segment->flushCache(*dm_context))
if (segment->flushCache(*dm_context))
{
// If the flush failed, it means there are parallel updates to the segment in the background.
// In this case, we try again.
continue;
}

const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread);
if (new_segment)
{
const auto segment_end = new_segment->getRowKeyRange().end;
if (unlikely(*segment_end.value <= *start_key.value))
const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread);
if (new_segment)
{
// The next start key must be > current start key
LOG_FMT_ERROR(log, "Assert new_segment.end {} > start {} failed", segment_end.toDebugString(), start_key.toDebugString());
throw Exception("Assert segment range failed", ErrorCodes::LOGICAL_ERROR);
}
return new_segment->getRowKeyRange();
}
const auto segment_end = new_segment->getRowKeyRange().end;
if (unlikely(*segment_end.value <= *start_key.value))
{
// The next start key must be > current start key
LOG_FMT_ERROR(log, "Assert new_segment.end {} > start {} failed", segment_end.toDebugString(), start_key.toDebugString());
throw Exception("Assert segment range failed", ErrorCodes::LOGICAL_ERROR);
}
return new_segment->getRowKeyRange();
} // else: sleep and retry
} // else: sleep and retry

// Typical cases:
// #1. flushCache failed
// - The segment is abandoned (due to segment updated)
// - There is another flush in progress (e.g. triggered in background)
// #2. segmentMergeDelta failed
// - The segment is abandoned (due to segment updated)
// - The segment is updating (e.g. a split-preparation is working, which occupies a for-write snapshot).
// It could be possible to take seconds to finish the segment updating, so let's sleep for a short time
// (50ms ~ 1000ms) and then retry.
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
sleep_ms = std::min(sleep_ms * 2, 1000);
}
}

Expand Down

0 comments on commit af90192

Please sign in to comment.