Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8702
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
JinheLin authored and ti-chi-bot committed Jan 18, 2024
1 parent 7618b28 commit a2e9ae5
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
18 changes: 18 additions & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,24 @@ struct Settings
M(SettingDouble, dt_read_thread_count_scale, 1.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \
M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_count, 5, "ColumnPtr object limitation for data sharing of each DMFileReader::Stream. 0 means disable data sharing") \
<<<<<<< HEAD
=======
M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \
M(SettingDouble, dt_read_thread_count_scale, 2.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \
M(SettingDouble, dt_filecache_max_downloading_count_scale, 1.0, "Max downloading task count of FileCache = io thread count * dt_filecache_max_downloading_count_scale.") \
M(SettingUInt64, dt_filecache_min_age_seconds, 1800, "Files of the same priority can only be evicted from files that were not accessed within `dt_filecache_min_age_seconds` seconds.") \
M(SettingUInt64, dt_small_file_size_threshold, 128 * 1024, "for dmfile, when the file size less than dt_small_file_size_threshold, it will be merged. If dt_small_file_size_threshold = 0, dmfile will just do as v2") \
M(SettingUInt64, dt_merged_file_max_size, 16 * 1024 * 1024, "Small files are merged into one or more files not larger than dt_merged_file_max_size") \
M(SettingUInt64, dt_fetch_pages_packet_limit_size, 512 * 1024, "Response packet bytes limit of FetchDisaggPages, 0 means one page per packet") \
M(SettingUInt64, dt_write_page_cache_limit_size, 2 * 1024 * 1024, "Limit size per write batch when compute node writing to PageStorage cache") \
M(SettingDouble, io_thread_count_scale, 5.0, "Number of thread of IOThreadPool = number of logical cpu cores * io_thread_count_scale. Only has meaning at server startup.") \
M(SettingDouble, dt_fetch_page_concurrency_scale, 4.0, "Concurrency of fetching pages of one query equals to num_streams * dt_fetch_page_concurrency_scale.") \
M(SettingDouble, dt_prepare_stream_concurrency_scale, 2.0, "Concurrency of preparing streams of one query equals to num_streams * dt_prepare_stream_concurrency_scale.") \
M(SettingBool, dt_enable_delta_index_error_fallback, true, "Whether fallback to an empty delta index if a delta index error is detected") \
M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \
M(SettingUInt64, init_thread_count_scale, 100, "Number of thread = number of logical cpu cores * init_thread_count_scale. It just works for thread pool for initStores and loadMetadata") \
M(SettingDouble, cpu_thread_count_scale, 1.0, "Number of thread of computation-intensive thread pool = number of logical cpu cores * cpu_thread_count_scale. Only takes effects at server startup.") \
>>>>>>> 1614f4ad44 (Storages: Update the length of Block queue and the number of read threads (#8702))
\
M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \
M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \
Expand Down
38 changes: 38 additions & 0 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,45 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t
return stream;
}

<<<<<<< HEAD
void SegmentReadTaskPool::finishSegment(const SegmentPtr & seg)
=======
SegmentReadTaskPool::SegmentReadTaskPool(
int extra_table_id_index_,
const ColumnDefines & columns_to_read_,
const PushDownFilterPtr & filter_,
uint64_t max_version_,
size_t expected_block_size_,
ReadMode read_mode_,
SegmentReadTasks && tasks_,
AfterSegmentRead after_segment_read_,
const String & tracing_id,
bool enable_read_thread_,
Int64 num_streams_,
const String & res_group_name_)
: pool_id(nextPoolId())
, mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this())
, extra_table_id_index(extra_table_id_index_)
, columns_to_read(columns_to_read_)
, filter(filter_)
, max_version(max_version_)
, expected_block_size(expected_block_size_)
, read_mode(read_mode_)
, tasks_wrapper(enable_read_thread_, std::move(tasks_))
, after_segment_read(after_segment_read_)
, log(Logger::get(tracing_id))
, unordered_input_stream_ref_count(0)
, exception_happened(false)
// If the queue is too short, only 1 in the extreme case, it may cause the computation thread
// to encounter empty queues frequently, resulting in too much waiting and thread context switching.
// We limit the length of block queue to be 1.5 times of `num_streams_`, and in the extreme case,
// when `num_streams_` is 1, `block_slot_limit` is at least 2.
, block_slot_limit(std::ceil(num_streams_ * 1.5))
// Limiting the minimum number of reading segments to 2 is to avoid, as much as possible,
// situations where the computation may be faster and the storage layer may not be able to keep up.
, active_segment_limit(std::max(num_streams_, 2))
, res_group_name(res_group_name_)
>>>>>>> 1614f4ad44 (Storages: Update the length of Block queue and the number of read threads (#8702))
{
after_segment_read(dm_context, seg);
bool pool_finished = false;
Expand Down

0 comments on commit a2e9ae5

Please sign in to comment.