Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize apply speed under heavy write pressure #4883

Merged
merged 10 commits into from
Jul 4, 2022
Prev Previous commit
Next Next commit
ignore CompactLog command if the segment is flushing
lidezhu committed Jun 24, 2022
commit f8f7206b0a6c6de8bbb79f10231c281434201f11
5 changes: 2 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
@@ -142,13 +142,12 @@ bool DeltaValueSpace::ingestColumnFiles(DMContext & /*context*/, const RowKeyRan
bool DeltaValueSpace::flush(DMContext & context)
{
bool v = false;
// Other thread is flushing, just return.
if (!is_flushing.compare_exchange_strong(v, true))
{
LOG_FMT_DEBUG(log, "{}, Flush stop because other thread is flushing", info());
// other thread is flushing, just return.
LOG_FMT_DEBUG(log, "{}, Flush stop because other thread is flushing", simpleInfo());
return false;
}

SCOPE_EXIT({
bool v = true;
if (!is_flushing.compare_exchange_strong(v, false))
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
@@ -77,7 +77,8 @@ class DeltaValueSpace
/// Note that those things can not be done at the same time.
std::atomic_bool is_updating = false;

/// Note that it's safe to do multiple flush concurrently but only one of them can succeed.
/// Note that it's safe to do multiple flush concurrently but only one of them can succeed,
/// and other thread's work is just a waste of resource.
/// So we only allow one flush task running at any time to aviod waste resource.
std::atomic_bool is_flushing = false;

25 changes: 16 additions & 9 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
@@ -980,14 +980,14 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings
checkSegmentUpdate(dm_context, segment, ThreadType::Write);
}

void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range)
bool DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed)
{
RowKeyRange cur_range = range;
while (!cur_range.none())
{
RowKeyRange segment_range;

// Keep trying until succeeded.
// Keep trying until succeeded if needed.
while (true)
{
SegmentPtr segment;
@@ -1010,10 +1010,15 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa
{
break;
}
else if (!try_until_succeed)
{
return false;
}
}

cur_range.setStart(segment_range.end);
}
return true;
}

void DeltaMergeStore::mergeDeltaAll(const Context & context)
@@ -1347,12 +1352,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
&& (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows
|| delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes);
bool should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 3 || unsaved_bytes >= delta_cache_limit_bytes * 3;
// Don't block write thread unless the data in delta layer is really too large.
// And read thread never do foreground flush so no need to special check for read thread.
if (thread_type == ThreadType::Write)
{
should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 100 || unsaved_bytes >= delta_cache_limit_bytes * 100;
}

bool should_background_merge_delta = ((delta_check_rows >= delta_limit_rows || delta_check_bytes >= delta_limit_bytes) //
&& (delta_rows - delta_last_try_merge_delta_rows >= delta_cache_limit_rows
@@ -1410,6 +1409,10 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
}
else if (should_background_flush)
{
/// It's meaningless to add more flush tasks if the segment is flushing.
/// Because only one flush task can proceed at any time.
/// And after the current flush task finished,
/// it will call `checkSegmentUpdate` again to check whether there is more flush task to do.
if (!segment->isFlushing())
{
delta_last_try_flush_rows = delta_rows;
@@ -1511,7 +1514,11 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
return false;
};
auto try_bg_compact = [&]() {
/// Only add background compact task when this segment is not flushing to reduce lock contention on the segment.
/// Compact task should be a really low priority task.
/// And if the segment is flushing,
/// we should avoid adding background compact task to reduce lock contention on the segment and save disk throughput.
/// And after the current flush task complete,
/// it will call `checkSegmentUpdate` again to check whether there is other kinds of task to do.
if (should_compact && !segment->isFlushing())
{
delta_last_try_compact_column_files = column_file_count;
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
@@ -367,14 +367,14 @@ class DeltaMergeStore : private boost::noncopyable
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);

/// Force flush all data to disk.
void flushCache(const Context & context, const RowKeyRange & range)
/// Try flush all data in `range` to disk and return whether the task succeed.
bool flushCache(const Context & context, const RowKeyRange & range, bool try_until_succeed = true)
{
auto dm_context = newDMContext(context, context.getSettingsRef());
flushCache(dm_context, range);
return flushCache(dm_context, range, try_until_succeed);
}

void flushCache(const DMContextPtr & dm_context, const RowKeyRange & range);
bool flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed = true);

/// Merge delta into the stable layer for all segments.
///
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
@@ -1333,7 +1333,9 @@ bool Segment::compactDelta(DMContext & dm_context)
CurrentMetrics::Increment cur_dm_segments{CurrentMetrics::DT_DeltaCompact};
GET_METRIC(tiflash_storage_subtask_count, type_delta_compact).Increment();
Stopwatch watch;
SCOPE_EXIT({ GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_compact).Observe(watch.elapsedSeconds()); });
SCOPE_EXIT({
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_compact).Observe(watch.elapsedSeconds());
});

return delta->compact(dm_context);
}
2 changes: 1 addition & 1 deletion dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@ class IManageableStorage : public IStorage

virtual void flushCache(const Context & /*context*/) {}

virtual void flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/) {}
virtual bool flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/, [[maybe_unused]] bool try_until_succeed = true) { return true; }

virtual BlockInputStreamPtr status() { return {}; }

6 changes: 3 additions & 3 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
@@ -775,12 +775,12 @@ void StorageDeltaMerge::checkStatus(const Context & context)

void StorageDeltaMerge::flushCache(const Context & context)
{
flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size));
flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size), /* try_until_succeed */ true);
}

void StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush)
bool StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed)
{
getAndMaybeInitStore()->flushCache(context, range_to_flush);
return getAndMaybeInitStore()->flushCache(context, range_to_flush, try_until_succeed);
}

void StorageDeltaMerge::mergeDelta(const Context & context)
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@ class StorageDeltaMerge

void flushCache(const Context & context) override;

void flushCache(const Context & context, const DM::RowKeyRange & range_to_flush) override;
bool flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed = true) override;

/// Merge delta into the stable layer for all segments.
///
27 changes: 17 additions & 10 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
@@ -129,7 +129,7 @@ void KVStore::traverseRegions(std::function<void(RegionID, const RegionPtr &)> &
callback(region.first, region.second);
}

void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log)
bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed)
{
auto table_id = region.getMappedTableID();
auto storage = tmt.getStorages().get(table_id);
@@ -139,7 +139,7 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
"tryFlushRegionCacheInStorage can not get table for region {} with table id {}, ignored",
region.toString(),
table_id);
return;
return true;
}

try
@@ -151,14 +151,15 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
region.getRange()->getMappedTableID(),
storage->isCommonHandle(),
storage->getRowKeyColumnSize());
storage->flushCache(tmt.getContext(), rowkey_range);
return storage->flushCache(tmt.getContext(), rowkey_range, try_until_succeed);
}
catch (DB::Exception & e)
{
// We can ignore if storage is already dropped.
if (e.code() != ErrorCodes::TABLE_IS_DROPPED)
throw;
}
return true;
}

void KVStore::tryPersist(RegionID region_id)
@@ -366,12 +367,12 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed)
|| size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed))
{
// if rows or bytes more than threshold, flush cache and perist mem data.
// if rows or bytes more than threshold, try to flush cache and persist mem data.
return true;
}
else
{
// if thhere is little data in mem, wait until time interval reached threshold.
// if there is little data in mem, wait until time interval reached threshold.
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT
return !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now());
@@ -381,11 +382,17 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(

if (check_sync_log())
{
tryFlushRegionCacheInStorage(tmt, curr_region, log);
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, /* try_until_succeed */ false))
{
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
}
else
{
return EngineStoreApplyRes::None;
}
}
return EngineStoreApplyRes::None;
}
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
@@ -91,7 +91,7 @@ class KVStore final : private boost::noncopyable

void tryPersist(RegionID region_id);

static void tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log);
static bool tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed = true);

size_t regionSize() const;
EngineStoreApplyRes handleAdminRaftCmd(raft_cmdpb::AdminRequest && request,
5 changes: 4 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
@@ -230,7 +230,10 @@ void removeObsoleteDataInStorage(
auto rowkey_range
= DM::RowKeyRange::fromRegionRange(handle_range, table_id, table_id, storage->isCommonHandle(), storage->getRowKeyColumnSize());
dm_storage->deleteRange(rowkey_range, context->getSettingsRef());
dm_storage->flushCache(*context, rowkey_range); // flush to disk
// flush to disk and keep try until success
while (!dm_storage->flushCache(*context, rowkey_range))
{
}
}
catch (DB::Exception & e)
{