From d4f7333d3141220f7cc4290c8eaea2f07bbfbf15 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 18 Jan 2024 15:33:47 +0800 Subject: [PATCH] Storages: Update the length of Block queue and the number of read threads --- dbms/src/Interpreters/Settings.h | 2 +- dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 856c064c7c1..bd57841d496 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -222,7 +222,7 @@ struct Settings M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests. 0 means disable data sharing") \ M(SettingUInt64, dt_max_sharing_column_count, 5, "ColumnPtr object limitation for data sharing of each DMFileReader::Stream. 0 means disable data sharing") \ M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \ - M(SettingDouble, dt_read_thread_count_scale, 1.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \ + M(SettingDouble, dt_read_thread_count_scale, 2.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \ M(SettingDouble, dt_filecache_max_downloading_count_scale, 1.0, "Max downloading task count of FileCache = io thread count * dt_filecache_max_downloading_count_scale.") \ M(SettingUInt64, dt_filecache_min_age_seconds, 1800, "Files of the same priority can only be evicted from files that were not accessed within `dt_filecache_min_age_seconds` seconds.") \ M(SettingUInt64, dt_small_file_size_threshold, 128 * 1024, "for dmfile, when the file size less than dt_small_file_size_threshold, it will be merged. If dt_small_file_size_threshold = 0, dmfile will just do as v2") \ diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 35c89ff69dd..457b7750d45 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -224,9 +224,10 @@ SegmentReadTaskPool::SegmentReadTaskPool( , unordered_input_stream_ref_count(0) , exception_happened(false) // If the queue is too short, only 1 in the extreme case, it may cause the computation thread - // to encounter empty queues frequently, resulting in too much waiting and thread context - // switching, so we limit the lower limit to 3, which provides two blocks of buffer space. - , block_slot_limit(std::max(num_streams_, 3)) + // to encounter empty queues frequently, resulting in too much waiting and thread context switching. + // We limit the length of block queue to be 1.5 times of `num_streams_`, and in the extreme case, + // when `num_streams_` is 1, `block_slot_limit` is at least 2. + , block_slot_limit(std::ceil(num_streams_ * 1.5)) // Limiting the minimum number of reading segments to 2 is to avoid, as much as possible, // situations where the computation may be faster and the storage layer may not be able to keep up. , active_segment_limit(std::max(num_streams_, 2))