-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix](scanner) Fix incorrect _max_thread_num in scanner context #40569
Merged
Merged
Changes from 4 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
6baa417
X
zhiqiang-hhhh 23a2efd
REF
zhiqiang-hhhh d37aa09
X
zhiqiang-hhhh e91aa7d
X
zhiqiang-hhhh 76ae129
FIX
zhiqiang-hhhh d6b88d8
X
zhiqiang-hhhh c1d693a
Merge branch 'master' of https://github.com/apache/doris into refacto…
zhiqiang-hhhh 0e051e5
X
zhiqiang-hhhh aa5f882
X
zhiqiang-hhhh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,8 +42,7 @@ ScannerContext::ScannerContext( | |
RuntimeState* state, pipeline::ScanLocalStateBase* local_state, | ||
const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, | ||
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_, | ||
int64_t max_bytes_in_blocks_queue, std::shared_ptr<pipeline::Dependency> dependency, | ||
const int num_parallel_instances) | ||
std::shared_ptr<pipeline::Dependency> dependency) | ||
: HasTaskExecutionCtx(state), | ||
_state(state), | ||
_local_state(local_state), | ||
|
@@ -53,53 +52,102 @@ ScannerContext::ScannerContext( | |
_output_row_descriptor(output_row_descriptor), | ||
_batch_size(state->batch_size()), | ||
limit(limit_), | ||
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * | ||
num_parallel_instances), | ||
_scanner_scheduler(state->exec_env()->scanner_scheduler()), | ||
_all_scanners(scanners.begin(), scanners.end()), | ||
_num_parallel_instances(num_parallel_instances) { | ||
_all_scanners(scanners.begin(), scanners.end()) { | ||
DCHECK(_output_row_descriptor == nullptr || | ||
_output_row_descriptor->tuple_descriptors().size() == 1); | ||
_query_id = _state->get_query_ctx()->query_id(); | ||
ctx_id = UniqueId::gen_uid().to_string(); | ||
// Provide more memory for wide tables, increase proportionally by multiples of 300 | ||
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; | ||
if (scanners.empty()) { | ||
_is_finished = true; | ||
_set_scanner_done(); | ||
} | ||
_scanners.enqueue_bulk(scanners.begin(), scanners.size()); | ||
if (limit < 0) { | ||
limit = -1; | ||
} | ||
MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio(); | ||
_query_thread_context = {_query_id, _state->query_mem_tracker(), | ||
_state->get_query_ctx()->workload_group()}; | ||
_dependency = dependency; | ||
} | ||
|
||
// After init function call, should not access _parent | ||
Status ScannerContext::init(bool ignore_data_distribution) { | ||
_scanner_profile = _local_state->_scanner_profile; | ||
_scanner_sched_counter = _local_state->_scanner_sched_counter; | ||
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; | ||
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer; | ||
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time; | ||
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter; | ||
|
||
#ifndef BE_TEST | ||
// 3. get thread token | ||
if (_state->get_query_ctx()) { | ||
thread_token = _state->get_query_ctx()->get_token(); | ||
_simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler(); | ||
if (_simple_scan_scheduler) { | ||
_should_reset_thread_name = false; | ||
} | ||
_remote_scan_task_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler(); | ||
} | ||
#endif | ||
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken", | ||
thread_token == nullptr ? "False" : "True"); | ||
|
||
int num_parallel_instances = _state->query_parallel_instance_num(); | ||
|
||
// NOTE: When ignore_data_distribution is true or if_file_scan is false, the parallelism | ||
// of the scan operator is regarded as 1 (actually maybe not). | ||
// That will make the number of scan task can be submitted to the scheduler | ||
// in a vary large value. This logicl is kept from the older implementation. | ||
// https://github.com/apache/doris/pull/28266 | ||
if (ignore_data_distribution) { | ||
num_parallel_instances = 1; | ||
} | ||
|
||
// _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance. | ||
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value | ||
// is larger than 10MB. | ||
_max_bytes_in_queue = | ||
std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 1024 * 10); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 是10MB, 不是10gb There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
// Provide more memory for wide tables, increase proportionally by multiples of 300 | ||
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; | ||
|
||
// TODO: Where is the proper position to place this code? | ||
if (_all_scanners.empty()) { | ||
_is_finished = true; | ||
_set_scanner_done(); | ||
} | ||
|
||
// _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time. | ||
// The overall target of our system is to make full utilization of the resources. | ||
// At the same time, we dont want too many tasks are queued by scheduler, that makes the query | ||
// waiting too long, and existing task can not be scheduled in time. | ||
// First of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than | ||
// config::doris_scanner_thread_pool_thread_num. | ||
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary. | ||
// So, first of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than | ||
// 2 * config::doris_scanner_thread_pool_thread_num, so that we can make all io threads busy. | ||
// For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128. | ||
// and the num_parallel_instances of this scan operator will be 64/2=32. | ||
// For a query who has two scan nodes, the _max_thread_num of each scan node instance will be 128 / 32 = 4. | ||
// We have 32 instances of this scan operator, so for the ScanNode, we have 4 * 32 = 128 scanner tasks can be submitted at a time. | ||
// Remember that we have to ScanNode in this query, so the total number of scanner tasks can be submitted at a time is 128 * 2 = 256. | ||
// For a query who has one scan nodes, the _max_thread_num of each scan node instance will be 2 * 128 / 32 = 8. | ||
// We have 32 instances of this scan operator, so for the ScanNode, we have 8 * 32 = 256 scanner tasks can be submitted at a time. | ||
// The thread pool of scanner is 128, that means we will have 128 tasks running in parallel and another 128 tasks are waiting in the queue. | ||
// When first 128 tasks are finished, the next 128 tasks will be extricated from the queue and be executed, | ||
// and another 128 tasks will be submitted to the queue if there are remaining. | ||
_max_thread_num = | ||
_state->num_scanner_threads() > 0 | ||
? _state->num_scanner_threads() | ||
: config::doris_scanner_thread_pool_thread_num / num_parallel_instances; | ||
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; | ||
// In some situation, there are not too many big tablets involed, so we can reduce the thread number. | ||
_max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size()); | ||
// NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0. | ||
_max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size()); | ||
|
||
// 1. Calculate max concurrency | ||
// For select * from table limit 10; should just use one thread. | ||
if (_local_state->should_run_serial()) { | ||
_max_thread_num = 1; | ||
} | ||
|
||
// when user not specify scan_thread_num, so we can try downgrade _max_thread_num. | ||
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory. | ||
// you can refer https://github.com/apache/doris/issues/35340 for details. | ||
int32_t max_column_reader_num = state->query_options().max_column_reader_num; | ||
int32_t max_column_reader_num = _state->query_options().max_column_reader_num; | ||
if (_max_thread_num != 1 && max_column_reader_num > 0) { | ||
int32_t scan_column_num = _output_tuple_desc->slots().size(); | ||
int32_t current_column_num = scan_column_num * _max_thread_num; | ||
|
@@ -109,43 +157,15 @@ ScannerContext::ScannerContext( | |
if (new_max_thread_num < _max_thread_num) { | ||
int32_t origin_max_thread_num = _max_thread_num; | ||
_max_thread_num = new_max_thread_num; | ||
LOG(INFO) << "downgrade query:" << print_id(state->query_id()) | ||
LOG(INFO) << "downgrade query:" << print_id(_state->query_id()) | ||
<< " scan's max_thread_num from " << origin_max_thread_num << " to " | ||
<< _max_thread_num << ",column num: " << scan_column_num | ||
<< ", max_column_reader_num: " << max_column_reader_num; | ||
} | ||
} | ||
} | ||
|
||
_query_thread_context = {_query_id, _state->query_mem_tracker(), | ||
_state->get_query_ctx()->workload_group()}; | ||
_dependency = dependency; | ||
} | ||
|
||
// After init function call, should not access _parent | ||
Status ScannerContext::init() { | ||
_scanner_profile = _local_state->_scanner_profile; | ||
_scanner_sched_counter = _local_state->_scanner_sched_counter; | ||
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; | ||
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer; | ||
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time; | ||
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter; | ||
|
||
#ifndef BE_TEST | ||
// 3. get thread token | ||
if (_state->get_query_ctx()) { | ||
thread_token = _state->get_query_ctx()->get_token(); | ||
_simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler(); | ||
if (_simple_scan_scheduler) { | ||
_should_reset_thread_name = false; | ||
} | ||
_remote_scan_task_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler(); | ||
} | ||
#endif | ||
|
||
COUNTER_SET(_local_state->_max_scanner_thread_num, (int64_t)_max_thread_num); | ||
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken", | ||
thread_token == nullptr ? "False" : "True"); | ||
|
||
// submit `_max_thread_num` running scanners to `ScannerScheduler` | ||
// When a running scanners is finished, it will submit one of the remaining scanners. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
可能把,这个参数,作为scanner context的构造函数的参数,传递给scanner context,这样你往2.1 pick的时候会比较方便。否则2.1 在多种pipeline 模型下,代码不好pick