diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 04cdaceed61..4a2e452b59c 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1238,6 +1238,9 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read); }; size_t final_num_stream = std::min(num_streams, tasks.size()); + String req_info; + if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask()) + req_info = db_context.getDAGContext()->getMPPTaskId().toString(); auto read_task_pool = std::make_shared( physical_table_id, dm_context, @@ -1248,11 +1251,9 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, /* is_raw = */ true, /* do_delete_mark_filter_for_raw = */ false, std::move(tasks), - after_segment_read); + after_segment_read, + req_info); - String req_info; - if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask()) - req_info = db_context.getDAGContext()->getMPPTaskId().toString(); BlockInputStreams res; for (size_t i = 0; i < final_num_stream; ++i) { @@ -1311,12 +1312,12 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !keep_order; // SegmentReadTaskScheduler and SegmentReadTaskPool use table_id + segment id as unique ID when read thread is enabled. // 'try_split_task' can result in several read tasks with the same id that can cause some trouble. - // Also, too many read tasks of a segment with different samll ranges is not good for data sharing cache. + // Also, too many read tasks of a segment with different small ranges is not good for data sharing cache. SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments, /*try_split_task =*/!enable_read_thread); auto tracing_logger = Logger::get(log->name(), dm_context->tracing_id); LOG_FMT_DEBUG(tracing_logger, - "Read create segment snapshot done keep_order {} dt_enable_read_thread {} => enable_read_thread {}", + "Read create segment snapshot done, keep_order={} dt_enable_read_thread={} enable_read_thread={}", keep_order, db_context.getSettingsRef().dt_enable_read_thread, enable_read_thread); @@ -1338,7 +1339,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, /* is_raw = */ is_fast_scan, /* do_delete_mark_filter_for_raw = */ is_fast_scan, std::move(tasks), - after_segment_read); + after_segment_read, + tracing_id); String req_info; if (db_context.getDAGContext() != nullptr && db_context.getDAGContext()->isMPPTask()) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/CPU.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/CPU.cpp index 280026eba75..aa8cc8ced1f 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/CPU.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/CPU.cpp @@ -71,39 +71,31 @@ std::vector> getLinuxNumaNodes() if (!nodes.exists() || !nodes.isDirectory()) { auto cpus = getCPUs(cpus_dir_name); - if (cpus.empty()) - { - throw Exception("Not recognize CPU: " + cpus_dir_name); - } + RUNTIME_CHECK_MSG(!cpus.empty(), "Not recognize CPU: {}", cpus_dir_name); numa_nodes.push_back(std::move(cpus)); + return numa_nodes; } - else + + // get the cpu id from each NUMA node + Poco::DirectoryIterator end; + for (Poco::DirectoryIterator iter(nodes); iter != end; ++iter) { - Poco::DirectoryIterator end; - for (Poco::DirectoryIterator iter(nodes); iter != end; ++iter) + if (!isNodeDir(iter.name())) { - if (!isNodeDir(iter.name())) - { - continue; - } - auto dir_name = nodes_dir_name + "/" + iter.name(); - auto cpus = getCPUs(dir_name); - if (cpus.empty()) - { - throw Exception("Not recognize CPU: " + nodes_dir_name); - } - numa_nodes.push_back(std::move(cpus)); + continue; } + auto dir_name = nodes_dir_name + "/" + iter.name(); + auto cpus = getCPUs(dir_name); + RUNTIME_CHECK_MSG(!cpus.empty(), "Not recognize CPU: {}", nodes_dir_name); + numa_nodes.push_back(std::move(cpus)); } - if (numa_nodes.empty()) - { - throw Exception("Not recognize CPU"); - } + RUNTIME_CHECK_MSG(!numa_nodes.empty(), "Not recognize CPU"); return numa_nodes; } std::vector> getNumaNodes(Poco::Logger * log) { +#ifndef __APPLE__ // Apple macbooks does not support NUMA try { return getLinuxNumaNodes(); @@ -120,6 +112,7 @@ std::vector> getNumaNodes(Poco::Logger * log) { LOG_FMT_WARNING(log, "Unknow Error"); } +#endif LOG_FMT_WARNING(log, "Cannot recognize the CPU NUMA infomation, use the CPU as 'one numa node'"); std::vector> numa_nodes(1); // "One numa node" return numa_nodes; diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h index 446e6777338..57d60eae77b 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h @@ -194,7 +194,7 @@ class ColumnSharingCacheMap auto add_total = add_count + add_stale; auto get_cached = get_hit + get_copy; auto get_total = get_miss + get_part + get_hit + get_copy; - return fmt::format("add_count {} add_stale {} add_ratio {} get_miss {} get_part {} get_hit {} get_copy {} cached_ratio {}", + return fmt::format("add_count={} add_stale={} add_ratio={} get_miss={} get_part={} get_hit={} get_copy={} cached_ratio={}", add_count, add_stale, add_total > 0 ? add_count * 1.0 / add_total : 0, @@ -218,7 +218,6 @@ class DMFileReaderPool { public: static DMFileReaderPool & instance(); - DMFileReaderPool() = default; ~DMFileReaderPool() = default; DISALLOW_COPY_AND_MOVE(DMFileReaderPool); @@ -226,9 +225,12 @@ class DMFileReaderPool void del(DMFileReader & reader); void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col); +private: + DMFileReaderPool() = default; + private: std::mutex mtx; std::unordered_map> readers; }; -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index df7f261bf14..b7725618fc0 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -21,7 +21,7 @@ SegmentReadTaskScheduler::SegmentReadTaskScheduler() : stop(false) , log(&Poco::Logger::get("SegmentReadTaskScheduler")) { - sched_thread = std::thread(&SegmentReadTaskScheduler::schedThread, this); + sched_thread = std::thread(&SegmentReadTaskScheduler::schedLoop, this); } SegmentReadTaskScheduler::~SegmentReadTaskScheduler() @@ -43,12 +43,12 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) merging_segments[pool->tableId()][seg_id].push_back(pool->poolId()); if (!seg_ids.insert(seg_id).second) { - throw DB::Exception(fmt::format("Not support split segment task. seg_ids {} => seg_id {} already exist.", seg_ids, seg_id)); + throw DB::Exception(fmt::format("Not support split segment task. segment_ids={} => segment_id={} already exist.", seg_ids, seg_id)); } } auto block_slots = pool->getFreeBlockSlots(); auto [unexpired, expired] = read_pools.count(pool->tableId()); - LOG_FMT_DEBUG(log, "add pool {} table {} block_slots {} segment count {} segments {} unexpired pool {} expired pool {}", // + LOG_FMT_DEBUG(log, "Added, pool_id={} table_id={} block_slots={} segment_count={} segments={} unexpired_pool={} expired_pool={}", // pool->poolId(), pool->tableId(), block_slots, @@ -200,13 +200,13 @@ bool SegmentReadTaskScheduler::schedule() auto [merged_task, run_sche] = scheduleMergedTask(); if (merged_task != nullptr) { - LOG_FMT_DEBUG(log, "scheduleMergedTask seg_id {} pools {} => {} ms", merged_task->getSegmentId(), merged_task->getPoolIds(), sw.elapsedMilliseconds()); + LOG_FMT_DEBUG(log, "scheduleMergedTask segment_id={} pool_ids={} cost={}ms", merged_task->getSegmentId(), merged_task->getPoolIds(), sw.elapsedMilliseconds()); SegmentReaderPoolManager::instance().addTask(std::move(merged_task)); } return run_sche; } -void SegmentReadTaskScheduler::schedThread() +void SegmentReadTaskScheduler::schedLoop() { while (!isStop()) { @@ -218,4 +218,4 @@ void SegmentReadTaskScheduler::schedThread() } } -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h index 51aef4d176c..aae98115da5 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h @@ -55,13 +55,13 @@ class SegmentReadTaskScheduler SegmentReadTaskScheduler(); // Choose segment to read. - // Returns + // Returns std::pair scheduleMergedTask(); void setStop(); bool isStop() const; bool schedule(); - void schedThread(); + void schedLoop(); SegmentReadTaskPools getPoolsUnlock(const std::vector & pool_ids); // @@ -80,4 +80,4 @@ class SegmentReadTaskScheduler Poco::Logger * log; }; -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp index f83a0ac8313..216527e506f 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp @@ -42,9 +42,9 @@ class SegmentReader ~SegmentReader() { - LOG_FMT_DEBUG(log, "SegmentReader stop begin"); + LOG_FMT_DEBUG(log, "Stop begin"); t.join(); - LOG_FMT_DEBUG(log, "SegmentReader stop end"); + LOG_FMT_DEBUG(log, "Stop end"); } std::thread::id getId() const @@ -70,11 +70,11 @@ class SegmentReader if (ret != 0) { // It can be failed due to some CPU core cannot access, such as CPU offline. - LOG_FMT_WARNING(log, "sched_setaffinity cpus {} fail: {}", cpus, std::strerror(errno)); + LOG_FMT_WARNING(log, "sched_setaffinity fail, cpus={} errno={}", cpus, std::strerror(errno)); } else { - LOG_FMT_DEBUG(log, "sched_setaffinity cpus {} succ", cpus); + LOG_FMT_DEBUG(log, "sched_setaffinity succ, cpus={}", cpus); } #endif } @@ -91,7 +91,7 @@ class SegmentReader { if (!task_queue.pop(merged_task)) { - LOG_FMT_INFO(log, "pop fail, stop {}", isStop()); + LOG_FMT_INFO(log, "Pop fail, stop={}", isStop()); return; } @@ -114,7 +114,7 @@ class SegmentReader } if (read_count <= 0) { - LOG_FMT_DEBUG(log, "pool {} seg_id {} read_count {}", merged_task->getPoolIds(), merged_task->getSegmentId(), read_count); + LOG_FMT_DEBUG(log, "All finished, pool_ids={} segment_id={} read_count={}", merged_task->getPoolIds(), merged_task->getSegmentId(), read_count); } } catch (DB::Exception & e) @@ -160,6 +160,8 @@ class SegmentReader std::vector cpus; }; +// ===== SegmentReaderPool ===== // + void SegmentReaderPool::addTask(MergedTaskPtr && task) { if (!task_queue.push(std::forward(task), nullptr)) @@ -171,12 +173,12 @@ void SegmentReaderPool::addTask(MergedTaskPtr && task) SegmentReaderPool::SegmentReaderPool(int thread_count, const std::vector & cpus) : log(&Poco::Logger::get("SegmentReaderPool")) { - LOG_FMT_INFO(log, "Create SegmentReaderPool thread_count {} cpus {} start", thread_count, cpus); + LOG_FMT_INFO(log, "Create start, thread_count={} cpus={}", thread_count, cpus); for (int i = 0; i < thread_count; i++) { readers.push_back(std::make_unique(task_queue, cpus)); } - LOG_FMT_INFO(log, "Create SegmentReaderPool thread_count {} cpus {} end", thread_count, cpus); + LOG_FMT_INFO(log, "Create end, thread_count={} cpus={}", thread_count, cpus); } SegmentReaderPool::~SegmentReaderPool() @@ -198,6 +200,8 @@ std::vector SegmentReaderPool::getReaderIds() const return ids; } +// ===== SegmentReaderPoolManager ===== // + SegmentReaderPoolManager::SegmentReaderPoolManager() : log(&Poco::Logger::get("SegmentReaderPoolManager")) {} @@ -215,7 +219,7 @@ void SegmentReaderPoolManager::init(const ServerInfo & server_info) auto ids = reader_pools.back()->getReaderIds(); reader_ids.insert(ids.begin(), ids.end()); } - LOG_FMT_INFO(log, "readers count {}", reader_ids.size()); + LOG_FMT_INFO(log, "num_readers={}", reader_ids.size()); } void SegmentReaderPoolManager::addTask(MergedTaskPtr && task) @@ -233,4 +237,4 @@ bool SegmentReaderPoolManager::isSegmentReader() const { return reader_ids.find(std::this_thread::get_id()) != reader_ids.end(); } -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h index 5408dcb4fd6..fc2a26b254f 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h @@ -13,6 +13,7 @@ // limitations under the License. #pragma once +#include #include #include #include @@ -31,10 +32,8 @@ class SegmentReaderPool public: SegmentReaderPool(int thread_count, const std::vector & cpus); ~SegmentReaderPool(); - SegmentReaderPool(const SegmentReaderPool &) = delete; - SegmentReaderPool & operator=(const SegmentReaderPool &) = delete; - SegmentReaderPool(SegmentReaderPool &&) = delete; - SegmentReaderPool & operator=(SegmentReaderPool &&) = delete; + + DISALLOW_COPY_AND_MOVE(SegmentReaderPool); void addTask(MergedTaskPtr && task); std::vector getReaderIds() const; @@ -74,4 +73,4 @@ class SegmentReaderPoolManager Poco::Logger * log; }; -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index 5bdbc7ed59d..3c609088798 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -45,18 +45,18 @@ class UnorderedInputStream : public IProfilingBlockInputStream { if (extra_table_id_index != InvalidColumnID) { - ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine(); + auto & extra_table_id_col_define = getExtraTableIDColumnDefine(); ColumnWithTypeAndName col{extra_table_id_col_define.type->createColumn(), extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id, extra_table_id_col_define.default_value}; header.insert(extra_table_id_index, col); } ref_no = task_pool->increaseUnorderedInputStreamRefCount(); - LOG_FMT_DEBUG(log, "pool {} ref {} created", task_pool->poolId(), ref_no); + LOG_FMT_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->poolId(), ref_no); } ~UnorderedInputStream() { task_pool->decreaseUnorderedInputStreamRefCount(); - LOG_FMT_DEBUG(log, "pool {} ref {} destroy", task_pool->poolId(), ref_no); + LOG_FMT_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->poolId(), ref_no); } String getName() const override { return NAME; } @@ -70,7 +70,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream return readImpl(filter_ignored, false); } - // Currently, res_fiter and return_filter is unused. + // Currently, res_filter and return_filter is unused. Block readImpl(FilterPtr & /*res_filter*/, bool /*return_filter*/) override { if (done) @@ -87,7 +87,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream { if (extra_table_id_index != InvalidColumnID) { - ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine(); + auto & extra_table_id_col_define = getExtraTableIDColumnDefine(); ColumnWithTypeAndName col{{}, extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id}; size_t row_number = res.rows(); auto col_data = col.type->createColumnConst(row_number, Field(physical_table_id)); @@ -114,7 +114,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream void readSuffixImpl() override { - LOG_FMT_DEBUG(log, "pool {} ref {} finish read {} rows from storage", task_pool->poolId(), ref_no, total_rows); + LOG_FMT_DEBUG(log, "Finish read from storage, pool_id={} ref_no={} rows={}", task_pool->poolId(), ref_no, total_rows); } private: @@ -128,4 +128,4 @@ class UnorderedInputStream : public IProfilingBlockInputStream int64_t ref_no; size_t total_rows = 0; }; -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h index 7726617ebef..c8f595cc69a 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h @@ -25,13 +25,13 @@ template class WorkQueue { // Protects all member variable access - std::mutex mutex_; - std::condition_variable readerCv_; - std::condition_variable writerCv_; - std::condition_variable finishCv_; - std::queue queue_; - bool done_; - std::size_t maxSize_; + std::mutex mu; + std::condition_variable reader_cv; + std::condition_variable writer_cv; + std::condition_variable finish_cv; + std::queue queue; + bool done; + std::size_t max_size; std::size_t peak_queue_size; int64_t pop_times; @@ -39,11 +39,11 @@ class WorkQueue // Must have lock to call this function bool full() const { - if (maxSize_ == 0) + if (max_size == 0) { return false; } - return queue_.size() >= maxSize_; + return queue.size() >= max_size; } public: @@ -53,9 +53,9 @@ class WorkQueue * * @param maxSize The maximum allowed size of the work queue. */ - WorkQueue(std::size_t maxSize = 0) - : done_(false) - , maxSize_(maxSize) + explicit WorkQueue(std::size_t maxSize = 0) + : done(false) + , max_size(maxSize) , peak_queue_size(0) , pop_times(0) , pop_empty_times(0) @@ -73,23 +73,23 @@ class WorkQueue bool push(U && item, size_t * size) { { - std::unique_lock lock(mutex_); - while (full() && !done_) + std::unique_lock lock(mu); + while (full() && !done) { - writerCv_.wait(lock); + writer_cv.wait(lock); } - if (done_) + if (done) { return false; } - queue_.push(std::forward(item)); - peak_queue_size = std::max(queue_.size(), peak_queue_size); + queue.push(std::forward(item)); + peak_queue_size = std::max(queue.size(), peak_queue_size); if (size != nullptr) { - *size = queue_.size(); + *size = queue.size(); } } - readerCv_.notify_one(); + reader_cv.notify_one(); return true; } /** @@ -104,22 +104,22 @@ class WorkQueue bool pop(T & item) { { - std::unique_lock lock(mutex_); + std::unique_lock lock(mu); pop_times++; - while (queue_.empty() && !done_) + while (queue.empty() && !done) { pop_empty_times++; - readerCv_.wait(lock); + reader_cv.wait(lock); } - if (queue_.empty()) + if (queue.empty()) { - assert(done_); + assert(done); return false; } - item = std::move(queue_.front()); - queue_.pop(); + item = std::move(queue.front()); + queue.pop(); } - writerCv_.notify_one(); + writer_cv.notify_one(); return true; } /** @@ -130,10 +130,10 @@ class WorkQueue void setMaxSize(std::size_t maxSize) { { - std::lock_guard lock(mutex_); - maxSize_ = maxSize; + std::lock_guard lock(mu); + max_size = maxSize; } - writerCv_.notify_all(); + writer_cv.notify_all(); } /** * Promise that `push()` won't be called again, so once the queue is empty @@ -142,28 +142,28 @@ class WorkQueue void finish() { { - std::lock_guard lock(mutex_); - assert(!done_); - done_ = true; + std::lock_guard lock(mu); + assert(!done); + done = true; } - readerCv_.notify_all(); - writerCv_.notify_all(); - finishCv_.notify_all(); + reader_cv.notify_all(); + writer_cv.notify_all(); + finish_cv.notify_all(); } /// Blocks until `finish()` has been called (but the queue may not be empty). void waitUntilFinished() { - std::unique_lock lock(mutex_); - while (!done_) + std::unique_lock lock(mu); + while (!done) { - finishCv_.wait(lock); + finish_cv.wait(lock); } } size_t size() { - std::lock_guard lock(mutex_); - return queue_.size(); + std::lock_guard lock(mu); + return queue.size(); } std::tuple getStat() const @@ -171,4 +171,4 @@ class WorkQueue return std::tuple{pop_times, pop_empty_times, peak_queue_size}; } }; -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 431cc9fb1a3..32cd22dfb18 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -109,7 +109,7 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); stream = seg->getInputStream(*dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, max_version, block_size); } - LOG_FMT_DEBUG(log, "getInputStream pool {} seg_id {} succ", pool_id, seg->segmentId()); + LOG_FMT_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, seg->segmentId()); return stream; } @@ -122,7 +122,7 @@ void SegmentReadTaskPool::finishSegment(const SegmentPtr & seg) active_segment_ids.erase(seg->segmentId()); pool_finished = active_segment_ids.empty() && tasks.empty(); } - LOG_FMT_DEBUG(log, "finishSegment pool {} seg_id {} pool_finished {}", pool_id, seg->segmentId(), pool_finished); + LOG_FMT_DEBUG(log, "finishSegment pool_id={} segment_id={} pool_finished={}", pool_id, seg->segmentId(), pool_finished); if (pool_finished) { q.finish(); @@ -136,7 +136,7 @@ SegmentReadTaskPtr SegmentReadTaskPool::getTask(uint64_t seg_id) auto itr = std::find_if(tasks.begin(), tasks.end(), [seg_id](const SegmentReadTaskPtr & task) { return task->segment->segmentId() == seg_id; }); if (itr == tasks.end()) { - throw Exception(fmt::format("pool {} seg_id {} not found", pool_id, seg_id)); + throw Exception(fmt::format("{} pool_id={} segment_id={} not found", __PRETTY_FUNCTION__, pool_id, seg_id)); } auto t = *(itr); tasks.erase(itr); @@ -159,11 +159,11 @@ std::unordered_map>::const_iterator SegmentReadT auto itr = segments.find(task->segment->segmentId()); if (itr == segments.end()) { - throw DB::Exception(fmt::format("seg_id {} not found from merging segments", task->segment->segmentId())); + throw DB::Exception(fmt::format("segment_id {} not found from merging segments", task->segment->segmentId())); } if (std::find(itr->second.begin(), itr->second.end(), poolId()) == itr->second.end()) { - throw DB::Exception(fmt::format("pool {} not found from merging segment {}=>{}", poolId(), itr->first, itr->second)); + throw DB::Exception(fmt::format("pool_id={} not found from merging segment {}=>{}", poolId(), itr->first, itr->second)); } if (target == segments.end() || itr->second.size() > target->second.size()) { diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index c878543dda6..eb4e987dec7 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -124,7 +124,8 @@ class SegmentReadTaskPool : private boost::noncopyable bool is_raw_, bool do_range_filter_for_raw_, SegmentReadTasks && tasks_, - AfterSegmentRead after_segment_read_) + AfterSegmentRead after_segment_read_, + const String & tracing_id) : pool_id(nextPoolId()) , table_id(table_id_) , dm_context(dm_context_) @@ -136,7 +137,7 @@ class SegmentReadTaskPool : private boost::noncopyable , do_range_filter_for_raw(do_range_filter_for_raw_) , tasks(std::move(tasks_)) , after_segment_read(after_segment_read_) - , log(&Poco::Logger::get("SegmentReadTaskPool")) + , log(Logger::get("SegmentReadTaskPool", tracing_id)) , unordered_input_stream_ref_count(0) , exception_happened(false) , mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this()) @@ -150,7 +151,7 @@ class SegmentReadTaskPool : private boost::noncopyable auto total_bytes = blk_stat.totalBytes(); auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0; auto approximate_max_pending_block_bytes = blk_avg_bytes * max_queue_size; - LOG_FMT_DEBUG(log, "pool {} table {} pop {} pop_empty {} pop_empty_ratio {} max_queue_size {} blk_avg_bytes {} approximate_max_pending_block_bytes {} MB total_count {} total_bytes {} MB", // + LOG_FMT_DEBUG(log, "Done. pool_id={} table_id={} pop={} pop_empty={} pop_empty_ratio={} max_queue_size={} blk_avg_bytes={} approximate_max_pending_block_bytes={:.2f}MB total_count={} total_bytes={:.2f}MB", // pool_id, table_id, pop_times, @@ -216,7 +217,7 @@ class SegmentReadTaskPool : private boost::noncopyable std::unordered_set active_segment_ids; WorkQueue q; BlockStat blk_stat; - Poco::Logger * log; + LoggerPtr log; std::atomic unordered_input_stream_ref_count; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 8c9f115a472..f62349794d8 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include