diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 33c0c8ffb0c..4b95250c4c2 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -319,7 +319,13 @@ struct Settings M(SettingUInt64, dt_checksum_frame_size, DBMS_DEFAULT_BUFFER_SIZE, "Frame size for delta tree stable storage") \ \ M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \ +<<<<<<< HEAD M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \ +======= + M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \ + M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \ + M(SettingDouble, dt_read_thread_count_scale, 1.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \ +>>>>>>> 5872f16c52 (Refine storage read thread count. (#6885)) \ M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \ M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \ diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index ce63627c3a2..8b52d500ef8 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1169,7 +1169,7 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "dt_enable_read_thread {}", global_context->getSettingsRef().dt_enable_read_thread); // `DMFileReaderPool` should be constructed before and destructed after `SegmentReaderPoolManager`. DM::DMFileReaderPool::instance(); - DM::SegmentReaderPoolManager::instance().init(server_info); + DM::SegmentReaderPoolManager::instance().init(server_info.cpu_info.logical_cores, settings.dt_read_thread_count_scale); DM::SegmentReadTaskScheduler::instance(); { diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp index 8c03eb54c76..52141e39221 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp @@ -208,18 +208,19 @@ SegmentReaderPoolManager::SegmentReaderPoolManager() SegmentReaderPoolManager::~SegmentReaderPoolManager() = default; -void SegmentReaderPoolManager::init(const ServerInfo & server_info) +void SegmentReaderPoolManager::init(UInt32 logical_cpu_cores, double read_thread_count_scale) { + double total_thread_count = logical_cpu_cores * read_thread_count_scale; auto numa_nodes = getNumaNodes(log); - LOG_INFO(log, "numa_nodes {} => {}", numa_nodes.size(), numa_nodes); + RUNTIME_CHECK(!numa_nodes.empty()); + UInt32 thread_count_per_node = std::ceil(total_thread_count / numa_nodes.size()); for (const auto & node : numa_nodes) { - int thread_count = node.empty() ? server_info.cpu_info.logical_cores : node.size(); - reader_pools.push_back(std::make_unique(thread_count, node)); + reader_pools.push_back(std::make_unique(thread_count_per_node, node)); auto ids = reader_pools.back()->getReaderIds(); reader_ids.insert(ids.begin(), ids.end()); } - LOG_INFO(log, "num_readers={}", reader_ids.size()); + LOG_INFO(log, "numa_nodes={} number_of_readers={}", numa_nodes, reader_ids.size()); } void SegmentReaderPoolManager::addTask(MergedTaskPtr && task) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h index 0a0268df030..e0bf4f676c5 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.h @@ -59,7 +59,7 @@ class SegmentReaderPoolManager static SegmentReaderPoolManager pool_manager; return pool_manager; } - void init(const ServerInfo & server_info); + void init(UInt32 logical_cpu_cores, double read_thread_count_scale); ~SegmentReaderPoolManager(); DISALLOW_COPY_AND_MOVE(SegmentReaderPoolManager); diff --git a/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp b/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp index 738b7432018..7d15e01ba3a 100644 --- a/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp @@ -256,7 +256,9 @@ void dailyRandomTest(WorkloadOptions & opts) void initReadThread() { DB::ServerInfo server_info; - DB::DM::SegmentReaderPoolManager::instance().init(server_info); + DB::DM::SegmentReaderPoolManager::instance().init( + server_info.cpu_info.logical_cores, + TiFlashTestEnv::getGlobalContext().getSettingsRef().dt_read_thread_count_scale); DB::DM::SegmentReadTaskScheduler::instance(); DB::DM::DMFileReaderPool::instance(); } diff --git a/dbms/src/TestUtils/gtests_dbms_main.cpp b/dbms/src/TestUtils/gtests_dbms_main.cpp index f176ffbf7d3..95bd2ea1c7a 100644 --- a/dbms/src/TestUtils/gtests_dbms_main.cpp +++ b/dbms/src/TestUtils/gtests_dbms_main.cpp @@ -63,7 +63,9 @@ int main(int argc, char ** argv) DB::ServerInfo server_info; // `DMFileReaderPool` should be constructed before and destructed after `SegmentReaderPoolManager`. DB::DM::DMFileReaderPool::instance(); - DB::DM::SegmentReaderPoolManager::instance().init(server_info); + DB::DM::SegmentReaderPoolManager::instance().init( + server_info.cpu_info.logical_cores, + DB::tests::TiFlashTestEnv::getGlobalContext().getSettingsRef().dt_read_thread_count_scale); DB::DM::SegmentReadTaskScheduler::instance(); #ifdef FIU_ENABLE