Skip to content

Commit

Permalink
Check RU in read thread of Storage. (#8369)
Browse files Browse the repository at this point in the history
close #8362
  • Loading branch information
JinheLin authored Nov 17, 2023
1 parent 226b613 commit 1123f88
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 23 deletions.
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,12 @@ namespace DB
Counter, \
F(type_sche_no_pool, {"type", "sche_no_pool"}), \
F(type_sche_no_slot, {"type", "sche_no_slot"}), \
F(type_sche_no_ru, {"type", "sche_no_ru"}), \
F(type_sche_no_segment, {"type", "sche_no_segment"}), \
F(type_sche_active_segment_limit, {"type", "sche_active_segment_limit"}), \
F(type_sche_from_cache, {"type", "sche_from_cache"}), \
F(type_sche_new_task, {"type", "sche_new_task"}), \
F(type_ru_exhausted, {"type", "ru_exhausted"}), \
F(type_add_cache_succ, {"type", "add_cache_succ"}), \
F(type_add_cache_stale, {"type", "add_cache_stale"}), \
F(type_get_cache_miss, {"type", "get_cache_miss"}), \
Expand Down
23 changes: 16 additions & 7 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ Block ColumnFileSetReader::readPKVersion(size_t offset, size_t limit)
return block;
}

static Int64 columnsSize(MutableColumns & columns)
{
Int64 bytes = 0;
for (const auto & col : columns)
bytes += col->byteSize();
return bytes;
}

size_t ColumnFileSetReader::readRows(
MutableColumns & output_columns,
size_t offset,
Expand All @@ -156,6 +164,7 @@ size_t ColumnFileSetReader::readRows(
if (end == start)
return 0;

auto bytes_before_read = columnsSize(output_columns);
auto [start_file_index, rows_start_in_start_file] = locatePosByAccumulation(column_file_rows_end, start);
auto [end_file_index, rows_end_in_end_file] = locatePosByAccumulation(column_file_rows_end, end);

Expand Down Expand Up @@ -187,13 +196,13 @@ size_t ColumnFileSetReader::readRows(
}
}

UInt64 delta_bytes = 0;
for (const auto & col : output_columns)
delta_bytes += col->byteSize();

lac_bytes_collector.collect(delta_bytes);
if (likely(context.scan_context))
context.scan_context->total_user_read_bytes += delta_bytes;
if (auto delta_bytes = columnsSize(output_columns) - bytes_before_read; delta_bytes > 0)
{
if (row_ids == nullptr)
lac_bytes_collector.collect(delta_bytes);
if (likely(context.scan_context))
context.scan_context->total_user_read_bytes += delta_bytes;
}

return actual_read;
}
Expand Down
12 changes: 8 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,8 @@ BlockInputStreams DeltaMergeStore::readRaw(
after_segment_read,
req_info,
enable_read_thread,
final_num_stream);
final_num_stream,
dm_context->scan_context->resource_group_name);

BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
Expand Down Expand Up @@ -1061,7 +1062,8 @@ void DeltaMergeStore::readRaw(
after_segment_read,
req_info,
enable_read_thread,
final_num_stream);
final_num_stream,
dm_context->scan_context->resource_group_name);

if (enable_read_thread)
{
Expand Down Expand Up @@ -1193,7 +1195,8 @@ BlockInputStreams DeltaMergeStore::read(
after_segment_read,
log_tracing_id,
enable_read_thread,
final_num_stream);
final_num_stream,
dm_context->scan_context->resource_group_name);

BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
Expand Down Expand Up @@ -1297,7 +1300,8 @@ void DeltaMergeStore::read(
after_segment_read,
log_tracing_id,
enable_read_thread,
final_num_stream);
final_num_stream,
dm_context->scan_context->resource_group_name);
const auto & columns_after_cast = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read;

if (enable_read_thread)
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ void MergedTask::initOnce()
setStreamFinished(cur_idx);
continue;
}
if (pool->isRUExhausted())
{
continue;
}
stream = pool->buildInputStream(task);
fiu_do_on(FailPoints::exception_in_merged_task_init, {
throw Exception("Fail point exception_in_merged_task_init is triggered.", ErrorCodes::FAIL_POINT_ERROR);
Expand Down Expand Up @@ -74,11 +78,16 @@ int MergedTask::readOneBlock()
continue;
}

if (pool->getFreeBlockSlots() <= 0)
if (pool->getFreeBlockSlots() <= 0 || pool->isRUExhausted())
{
continue;
}

if (stream == nullptr)
{
stream = pool->buildInputStream(task);
}

if (pool->readOneBlock(stream, task))
{
read_block_count++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,39 @@ SegmentReadTaskPools SegmentReadTaskScheduler::getPoolsUnlock(const std::vector<

bool SegmentReadTaskScheduler::needScheduleToRead(const SegmentReadTaskPoolPtr & pool)
{
return pool->getFreeBlockSlots() > 0 && // Block queue is not full and
(merged_task_pool.has(pool->pool_id) || // can schedule a segment from MergedTaskPool or
pool->getFreeActiveSegments() > 0); // schedule a new segment.
if (pool->getFreeBlockSlots() <= 0)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_slot).Increment();
return false;
}

if (pool->isRUExhausted())
{
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_ru).Increment();
return false;
}

// Check if there are segments that can be scheduled:
// 1. There are already activated segments.
if (merged_task_pool.has(pool->pool_id))
{
return true;
}
// 2. Not reach limitation, we can activate a segment.
if (pool->getFreeActiveSegments() > 0 && pool->getPendingSegmentCount() > 0)
{
return true;
}

if (pool->getFreeActiveSegments() <= 0)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_active_segment_limit).Increment();
}
else
{
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_segment).Increment();
}
return false;
}

SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlock()
Expand All @@ -143,10 +173,6 @@ SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlo
{
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_pool).Increment();
}
else
{
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_slot).Increment();
}
return nullptr;
}

Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ struct SegmentSnapshot : private boost::noncopyable
UInt64 getRows() const { return delta->getRows() + stable->getRows(); }

bool isForUpdate() const { return delta->isForUpdate(); }

UInt64 estimatedBytesOfInternalColumns() const
{
// TODO: how about cluster index?
// handle + version + flag
return (sizeof(Int64) + sizeof(UInt64) + sizeof(UInt8)) * getRows();
}
};

/// A segment contains many rows of a table. A table is split into segments by consecutive ranges.
Expand Down
75 changes: 74 additions & 1 deletion dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t

MemoryTrackerSetter setter(true, mem_tracker.get());

if (likely(read_mode == ReadMode::Bitmap && !res_group_name.empty()))
{
auto bytes = t->read_snapshot->estimatedBytesOfInternalColumns();
LocalAdmissionController::global_instance->consumeResource(res_group_name, bytesToRU(bytes), 0);
}
t->initInputStream(
columns_to_read,
max_version,
Expand Down Expand Up @@ -120,7 +125,8 @@ SegmentReadTaskPool::SegmentReadTaskPool(
AfterSegmentRead after_segment_read_,
const String & tracing_id,
bool enable_read_thread_,
Int64 num_streams_)
Int64 num_streams_,
const String & res_group_name_)
: pool_id(nextPoolId())
, mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this())
, extra_table_id_index(extra_table_id_index_)
Expand All @@ -141,6 +147,7 @@ SegmentReadTaskPool::SegmentReadTaskPool(
// Limiting the minimum number of reading segments to 2 is to avoid, as much as possible,
// situations where the computation may be faster and the storage layer may not be able to keep up.
, active_segment_limit(std::max(num_streams_, 2))
, res_group_name(res_group_name_)
{
if (tasks_wrapper.empty())
{
Expand Down Expand Up @@ -272,6 +279,7 @@ void SegmentReadTaskPool::pushBlock(Block && block)
{
blk_stat.push(block);
global_blk_stat.push(block);
read_bytes_after_last_check += block.bytes();
q.push(std::move(block), nullptr);
}

Expand Down Expand Up @@ -300,6 +308,12 @@ Int64 SegmentReadTaskPool::getFreeActiveSegmentsUnlock() const
return active_segment_limit - static_cast<Int64>(active_segment_ids.size());
}

Int64 SegmentReadTaskPool::getPendingSegmentCount() const
{
std::lock_guard lock(mutex);
return tasks_wrapper.getTasks().size();
}

bool SegmentReadTaskPool::exceptionHappened() const
{
return exception_happened.load(std::memory_order_relaxed);
Expand All @@ -320,4 +334,63 @@ void SegmentReadTaskPool::setException(const DB::Exception & e)
}
}

static Int64 currentMS()
{
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
}

static bool checkIsRUExhausted(const String & res_group_name)
{
auto priority = LocalAdmissionController::global_instance->getPriority(res_group_name);
if (unlikely(!priority.has_value()))
{
return false;
}
return LocalAdmissionController::isRUExhausted(*priority);
}

bool SegmentReadTaskPool::isRUExhausted()
{
auto res = isRUExhaustedImpl();
if (res)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_ru_exhausted).Increment();
}
return res;
}

bool SegmentReadTaskPool::isRUExhaustedImpl()
{
if (unlikely(res_group_name.empty() || LocalAdmissionController::global_instance == nullptr))
{
return false;
}

// To reduce lock contention in resource control,
// check if RU is exhuasted every `bytes_of_one_hundred_ru` or every `100ms`.

// Fast path.
Int64 ms = currentMS();
if (read_bytes_after_last_check < bytes_of_one_hundred_ru && ms - last_time_check_ru < check_ru_interval_ms)
{
return ru_is_exhausted; // Return result of last time.
}

std::lock_guard lock(ru_mu);
// If last thread has check is ru exhausted, use the result of last thread.
// Attention: `read_bytes_after_last_check` can be written concurrently in `pushBlock`.
ms = currentMS();
if (read_bytes_after_last_check < bytes_of_one_hundred_ru && ms - last_time_check_ru < check_ru_interval_ms)
{
return ru_is_exhausted; // Return result of last time.
}

// Check and reset everything.
read_bytes_after_last_check = 0;
ru_is_exhausted = checkIsRUExhausted(res_group_name);
last_time_check_ru = ms;
return ru_is_exhausted;
}

} // namespace DB::DM
15 changes: 14 additions & 1 deletion dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ class SegmentReadTaskPool : private boost::noncopyable
AfterSegmentRead after_segment_read_,
const String & tracing_id,
bool enable_read_thread_,
Int64 num_streams_);
Int64 num_streams_,
const String & res_group_name_);

~SegmentReadTaskPool()
{
Expand Down Expand Up @@ -178,6 +179,7 @@ class SegmentReadTaskPool : private boost::noncopyable
Int64 decreaseUnorderedInputStreamRefCount();
Int64 getFreeBlockSlots() const;
Int64 getFreeActiveSegments() const;
Int64 getPendingSegmentCount() const;
bool valid() const;
void setException(const DB::Exception & e);

Expand Down Expand Up @@ -206,12 +208,16 @@ class SegmentReadTaskPool : private boost::noncopyable
}
}

bool isRUExhausted();

private:
Int64 getFreeActiveSegmentsUnlock() const;
bool exceptionHappened() const;
void finishSegment(const SegmentReadTaskPtr & seg);
void pushBlock(Block && block);

bool isRUExhaustedImpl();

const int extra_table_id_index;
ColumnDefines columns_to_read;
PushDownFilterPtr filter;
Expand Down Expand Up @@ -240,9 +246,16 @@ class SegmentReadTaskPool : private boost::noncopyable
const Int64 block_slot_limit;
const Int64 active_segment_limit;

const String res_group_name;
std::mutex ru_mu;
std::atomic<Int64> last_time_check_ru = 0;
std::atomic<bool> ru_is_exhausted = false;
std::atomic<UInt64> read_bytes_after_last_check = 0;

inline static std::atomic<uint64_t> pool_id_gen{1};
inline static BlockStat global_blk_stat;
static uint64_t nextPoolId() { return pool_id_gen.fetch_add(1, std::memory_order_relaxed); }
inline static constexpr Int64 check_ru_interval_ms = 100;
};

using SegmentReadTaskPoolPtr = std::shared_ptr<SegmentReadTaskPool>;
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
if (likely(scan_context != nullptr))
{
scan_context->total_user_read_bytes += bytes;
lac_bytes_collector.collect(bytes);
if constexpr (!need_row_id)
{
lac_bytes_collector.collect(bytes);
}
}
}
BlockInputStreams::iterator current_stream;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/StorageDisaggregatedRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,8 @@ std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr> StorageDisagg
/*after_segment_read*/ [](const DM::DMContextPtr &, const DM::SegmentPtr &) {},
executor_id,
/*enable_read_thread*/ true,
num_streams);
num_streams,
context.getDAGContext()->getResourceGroupName());
}
else
{
Expand Down

0 comments on commit 1123f88

Please sign in to comment.