From a2e9ae558eb98bd48184987d8588cb77a58c5cd8 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 18 Jan 2024 19:55:46 +0800 Subject: [PATCH] This is an automated cherry-pick of #8702 Signed-off-by: ti-chi-bot --- dbms/src/Interpreters/Settings.h | 18 +++++++++ .../DeltaMerge/SegmentReadTaskPool.cpp | 38 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 54b41197eeb..997965e7f65 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -323,6 +323,24 @@ struct Settings M(SettingDouble, dt_read_thread_count_scale, 1.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \ M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests. 0 means disable data sharing") \ M(SettingUInt64, dt_max_sharing_column_count, 5, "ColumnPtr object limitation for data sharing of each DMFileReader::Stream. 0 means disable data sharing") \ +<<<<<<< HEAD +======= + M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \ + M(SettingDouble, dt_read_thread_count_scale, 2.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \ + M(SettingDouble, dt_filecache_max_downloading_count_scale, 1.0, "Max downloading task count of FileCache = io thread count * dt_filecache_max_downloading_count_scale.") \ + M(SettingUInt64, dt_filecache_min_age_seconds, 1800, "Files of the same priority can only be evicted from files that were not accessed within `dt_filecache_min_age_seconds` seconds.") \ + M(SettingUInt64, dt_small_file_size_threshold, 128 * 1024, "for dmfile, when the file size less than dt_small_file_size_threshold, it will be merged. If dt_small_file_size_threshold = 0, dmfile will just do as v2") \ + M(SettingUInt64, dt_merged_file_max_size, 16 * 1024 * 1024, "Small files are merged into one or more files not larger than dt_merged_file_max_size") \ + M(SettingUInt64, dt_fetch_pages_packet_limit_size, 512 * 1024, "Response packet bytes limit of FetchDisaggPages, 0 means one page per packet") \ + M(SettingUInt64, dt_write_page_cache_limit_size, 2 * 1024 * 1024, "Limit size per write batch when compute node writing to PageStorage cache") \ + M(SettingDouble, io_thread_count_scale, 5.0, "Number of thread of IOThreadPool = number of logical cpu cores * io_thread_count_scale. Only has meaning at server startup.") \ + M(SettingDouble, dt_fetch_page_concurrency_scale, 4.0, "Concurrency of fetching pages of one query equals to num_streams * dt_fetch_page_concurrency_scale.") \ + M(SettingDouble, dt_prepare_stream_concurrency_scale, 2.0, "Concurrency of preparing streams of one query equals to num_streams * dt_prepare_stream_concurrency_scale.") \ + M(SettingBool, dt_enable_delta_index_error_fallback, true, "Whether fallback to an empty delta index if a delta index error is detected") \ + M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \ + M(SettingUInt64, init_thread_count_scale, 100, "Number of thread = number of logical cpu cores * init_thread_count_scale. It just works for thread pool for initStores and loadMetadata") \ + M(SettingDouble, cpu_thread_count_scale, 1.0, "Number of thread of computation-intensive thread pool = number of logical cpu cores * cpu_thread_count_scale. Only takes effects at server startup.") \ +>>>>>>> 1614f4ad44 (Storages: Update the length of Block queue and the number of read threads (#8702)) \ M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \ M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \ diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index cf9775ac8f0..38ec53dd6cc 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -162,7 +162,45 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t return stream; } +<<<<<<< HEAD void SegmentReadTaskPool::finishSegment(const SegmentPtr & seg) +======= +SegmentReadTaskPool::SegmentReadTaskPool( + int extra_table_id_index_, + const ColumnDefines & columns_to_read_, + const PushDownFilterPtr & filter_, + uint64_t max_version_, + size_t expected_block_size_, + ReadMode read_mode_, + SegmentReadTasks && tasks_, + AfterSegmentRead after_segment_read_, + const String & tracing_id, + bool enable_read_thread_, + Int64 num_streams_, + const String & res_group_name_) + : pool_id(nextPoolId()) + , mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this()) + , extra_table_id_index(extra_table_id_index_) + , columns_to_read(columns_to_read_) + , filter(filter_) + , max_version(max_version_) + , expected_block_size(expected_block_size_) + , read_mode(read_mode_) + , tasks_wrapper(enable_read_thread_, std::move(tasks_)) + , after_segment_read(after_segment_read_) + , log(Logger::get(tracing_id)) + , unordered_input_stream_ref_count(0) + , exception_happened(false) + // If the queue is too short, only 1 in the extreme case, it may cause the computation thread + // to encounter empty queues frequently, resulting in too much waiting and thread context switching. + // We limit the length of block queue to be 1.5 times of `num_streams_`, and in the extreme case, + // when `num_streams_` is 1, `block_slot_limit` is at least 2. + , block_slot_limit(std::ceil(num_streams_ * 1.5)) + // Limiting the minimum number of reading segments to 2 is to avoid, as much as possible, + // situations where the computation may be faster and the storage layer may not be able to keep up. + , active_segment_limit(std::max(num_streams_, 2)) + , res_group_name(res_group_name_) +>>>>>>> 1614f4ad44 (Storages: Update the length of Block queue and the number of read threads (#8702)) { after_segment_read(dm_context, seg); bool pool_finished = false;