Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Storages: Add some stats of table scanning #8990

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool)
{
merging_segments[seg_id].push_back(pool->pool_id);
}

pool->startTableScanning();

LOG_INFO(
pool->getLogger(),
"Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={:.3f}us do_add_cost={:.3f}us", //
Expand Down
62 changes: 59 additions & 3 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,15 @@ bool SegmentReadTasksWrapper::empty() const
return ordered_tasks.empty() && unordered_tasks.empty();
}

Int64 SegmentReadTasksWrapper::size() const
{
return enable_read_thread ? unordered_tasks.size() : ordered_tasks.size();
}

BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t)
{
Stopwatch sw;
SCOPE_EXIT({ scanning_execution_ns += sw.elapsed(); });
MemoryTrackerSetter setter(true, mem_tracker.get());

t->fetchPages();
Expand Down Expand Up @@ -148,14 +155,50 @@ SegmentReadTaskPool::SegmentReadTaskPool(
// Limiting the minimum number of reading segments to 2 is to avoid, as much as possible,
// situations where the computation may be faster and the storage layer may not be able to keep up.
, active_segment_limit(std::max(num_streams_, 2))
, seg_task_count(tasks_wrapper.size())
, res_group_name(res_group_name_)
{
if (tasks_wrapper.empty())
{
q.finish();
finishTableScanning();
}
}

SegmentReadTaskPool::~SegmentReadTaskPool()
{
auto [pop_times, pop_empty_times, max_queue_size] = q.getStat();
auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0;
auto total_count = blk_stat.totalCount();
auto total_bytes = blk_stat.totalBytes();
auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0;
auto max_pending_block_bytes = blk_avg_bytes * max_queue_size;
auto total_rows = blk_stat.totalRows();
LOG_INFO(
log,
"Done. pool_id={} pop={} pop_empty={} pop_empty_ratio={} "
"max_queue_size={} blk_avg_bytes={} max_pending_block_bytes={:.2f}MB "
"total_count={} total_bytes={:.2f}MB total_rows={} avg_block_rows={} avg_rows_bytes={}B "
"waitting_start_time={}ms scanning_wall_time={}ms scanning_execution_time={}ms "
"seg_task_count={} execution_time_per_seg={}ms",
pool_id,
pop_times,
pop_empty_times,
pop_empty_ratio,
max_queue_size,
blk_avg_bytes,
max_pending_block_bytes / 1024.0 / 1024.0,
total_count,
total_bytes / 1024.0 / 1024.0,
total_rows,
total_count > 0 ? total_rows / total_count : 0,
total_rows > 0 ? total_bytes / total_rows : 0,
waitting_start_clock.elapsedMilliseconds(),
scanning_wall_clock.elapsedMilliseconds(),
scanning_execution_ns / 1000'000,
seg_task_count,
seg_task_count > 0 ? scanning_execution_ns / seg_task_count / 1000'000 : 0);
}

void SegmentReadTaskPool::finishSegment(const SegmentReadTaskPtr & seg)
{
after_segment_read(seg->dm_context, seg->segment);
Expand All @@ -168,7 +211,7 @@ void SegmentReadTaskPool::finishSegment(const SegmentReadTaskPtr & seg)
LOG_DEBUG(log, "finishSegment pool_id={} segment={} pool_finished={}", pool_id, seg, pool_finished);
if (pool_finished)
{
q.finish();
finishTableScanning();
}
}

Expand Down Expand Up @@ -233,6 +276,8 @@ MergingSegments::iterator SegmentReadTaskPool::scheduleSegment(

bool SegmentReadTaskPool::readOneBlock(BlockInputStreamPtr & stream, const SegmentReadTaskPtr & seg)
{
Stopwatch sw;
SCOPE_EXIT({ scanning_execution_ns += sw.elapsed(); });
MemoryTrackerSetter setter(true, mem_tracker.get());
FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream);
auto block = stream->read();
Expand Down Expand Up @@ -332,7 +377,7 @@ void SegmentReadTaskPool::setException(const DB::Exception & e)
{
exception = e;
exception_happened.store(true, std::memory_order_relaxed);
q.finish();
finishTableScanning();
}
}

Expand Down Expand Up @@ -395,4 +440,15 @@ bool SegmentReadTaskPool::isRUExhaustedImpl()
return ru_is_exhausted;
}

void SegmentReadTaskPool::startTableScanning()
{
waitting_start_clock.stop();
scanning_wall_clock.start();
}
void SegmentReadTaskPool::finishTableScanning()
{
q.finish();
scanning_wall_clock.stop();
}

} // namespace DB::DM
37 changes: 10 additions & 27 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class SegmentReadTasksWrapper
const std::unordered_map<GlobalSegmentID, SegmentReadTaskPtr> & getTasks() const;

bool empty() const;
Int64 size() const;

private:
bool enable_read_thread;
Expand All @@ -120,33 +121,7 @@ class SegmentReadTaskPool
Int64 num_streams_,
const String & res_group_name_);

~SegmentReadTaskPool() override
{
auto [pop_times, pop_empty_times, max_queue_size] = q.getStat();
auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0;
auto total_count = blk_stat.totalCount();
auto total_bytes = blk_stat.totalBytes();
auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0;
auto approximate_max_pending_block_bytes = blk_avg_bytes * max_queue_size;
auto total_rows = blk_stat.totalRows();
LOG_INFO(
log,
"Done. pool_id={} pop={} pop_empty={} pop_empty_ratio={} "
"max_queue_size={} blk_avg_bytes={} approximate_max_pending_block_bytes={:.2f}MB "
"total_count={} total_bytes={:.2f}MB total_rows={} avg_block_rows={} avg_rows_bytes={}B",
pool_id,
pop_times,
pop_empty_times,
pop_empty_ratio,
max_queue_size,
blk_avg_bytes,
approximate_max_pending_block_bytes / 1024.0 / 1024.0,
total_count,
total_bytes / 1024.0 / 1024.0,
total_rows,
total_count > 0 ? total_rows / total_count : 0,
total_rows > 0 ? total_bytes / total_rows : 0);
}
~SegmentReadTaskPool() override;

SegmentReadTaskPtr nextTask();
const std::unordered_map<GlobalSegmentID, SegmentReadTaskPtr> & getTasks();
Expand All @@ -163,6 +138,8 @@ class SegmentReadTaskPool
UInt64 expected_merge_count,
bool enable_data_sharing);

void startTableScanning();
void finishTableScanning();
Int64 increaseUnorderedInputStreamRefCount();
Int64 decreaseUnorderedInputStreamRefCount();
Int64 getFreeBlockSlots() const;
Expand Down Expand Up @@ -237,13 +214,19 @@ class SegmentReadTaskPool

const Int64 block_slot_limit;
const Int64 active_segment_limit;
const Int64 seg_task_count;

const String res_group_name;
std::mutex ru_mu;
std::atomic<Int64> last_time_check_ru = 0;
std::atomic<bool> ru_is_exhausted = false;
std::atomic<UInt64> read_bytes_after_last_check = 0;

// Stats of table scanning.
Stopwatch waitting_start_clock; // Watching the duration from creating this object to starting to read.
Stopwatch scanning_wall_clock; // Watching the duration from starting to read to finished.
std::atomic<Int64> scanning_execution_ns{0}; // Sum of executions time of all read threads.

inline static std::atomic<uint64_t> pool_id_gen{1};
inline static BlockStat global_blk_stat;
static uint64_t nextPoolId() { return pool_id_gen.fetch_add(1, std::memory_order_relaxed); }
Expand Down