Skip to content

Commit

Permalink
[Refactor](exec) refactor the partion sort code (apache#46262)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

do refactor the partion sort code remove the unless code

Problem Summary:

### Release note

None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [ ] Regression test
    - [ ] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [x] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [ ] Previous test can cover this change.
        - [ ] No code files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

- Behavior changed:
    - [x] No.
    - [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?
    - [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
apache/doris-website#1214 -->

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
  • Loading branch information
HappenLee authored Jan 3, 2025
1 parent 0163d89 commit 073fd68
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 52 deletions.
52 changes: 28 additions & 24 deletions be/src/pipeline/common/partition_sort_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,35 @@
namespace doris {

Status PartitionBlocks::append_block_by_selector(const vectorized::Block* input_block, bool eos) {
if (_blocks.empty() || reach_limit()) {
_init_rows = _partition_sort_info->_runtime_state->batch_size();
_blocks.push_back(vectorized::Block::create_unique(
vectorized::VectorizedUtils::create_empty_block(_partition_sort_info->_row_desc)));
}
auto columns = input_block->get_columns();
auto mutable_columns = _blocks.back()->mutate_columns();
DCHECK(columns.size() == mutable_columns.size());
for (int i = 0; i < mutable_columns.size(); ++i) {
columns[i]->append_data_by_selector(mutable_columns[i], _selector);
}
_blocks.back()->set_columns(std::move(mutable_columns));
auto selector_rows = _selector.size();
_init_rows = _init_rows - selector_rows;
_total_rows = _total_rows + selector_rows;
_current_input_rows = _current_input_rows + selector_rows;
_selector.clear();
// maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD
if (!eos && _partition_sort_info->_partition_inner_limit != -1 &&
_current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD &&
_partition_sort_info->_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL) {
create_or_reset_sorter_state();
RETURN_IF_ERROR(do_partition_topn_sort());
_current_input_rows = 0; // reset record
_do_partition_topn_count++;

if (selector_rows) {
if (_blocks.empty() || reach_limit()) {
_init_rows = _partition_sort_info->_runtime_state->batch_size();
_blocks.push_back(vectorized::Block::create_unique(
vectorized::VectorizedUtils::create_empty_block(
_partition_sort_info->_row_desc)));
}
auto columns = input_block->get_columns();
auto mutable_columns = _blocks.back()->mutate_columns();
DCHECK(columns.size() == mutable_columns.size());
for (int i = 0; i < mutable_columns.size(); ++i) {
columns[i]->append_data_by_selector(mutable_columns[i], _selector);
}
_blocks.back()->set_columns(std::move(mutable_columns));
_init_rows = _init_rows - selector_rows;
_total_rows = _total_rows + selector_rows;
_current_input_rows = _current_input_rows + selector_rows;
_selector.clear();
// maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD
if (!eos && _partition_sort_info->_partition_inner_limit != -1 &&
_current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD &&
_partition_sort_info->_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL) {
create_or_reset_sorter_state();
RETURN_IF_ERROR(do_partition_topn_sort());
_current_input_rows = 0; // reset record
_do_partition_topn_count++;
}
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int ope
_pool(pool),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_limit(tnode.limit),
_partition_exprs_num(cast_set<int>(tnode.partition_sort_node.partition_exprs.size())),
_partition_exprs_num(static_cast<int>(tnode.partition_sort_node.partition_exprs.size())),
_topn_phase(tnode.partition_sort_node.ptopn_phase),
_has_global_limit(tnode.partition_sort_node.has_global_limit),
_top_n_algorithm(tnode.partition_sort_node.top_n_algorithm),
Expand Down
45 changes: 23 additions & 22 deletions be/src/pipeline/exec/partition_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,45 +41,46 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
output_block->clear_column_data();

auto get_data_from_blocks_buffer = false;
{
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
if (local_state._shared_state->blocks_buffer.empty() == false) {
get_data_from_blocks_buffer = !local_state._shared_state->blocks_buffer.empty();
if (get_data_from_blocks_buffer) {
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();
//if buffer have no data and sink not eos, block reading and wait for signal again
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, output_block, output_block->columns()));

if (local_state._shared_state->blocks_buffer.empty() &&
local_state._shared_state->sink_eos == false) {
!local_state._shared_state->sink_eos) {
// add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos.
// so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake
std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock);
if (local_state._shared_state->sink_eos == false) {
if (!local_state._shared_state->sink_eos) {
local_state._dependency->block();
}
}
if (!output_block->empty()) {
local_state._num_rows_returned += output_block->rows();
}
return Status::OK();
}
}

// is_ready_for_read: this is set by sink node using: local_state._dependency->set_ready_for_read()
// notice: must output block from _blocks_buffer firstly, and then get_sorted_block.
// as when the child is eos, then set _can_read = true, and _partition_sorts have push_back sorter.
// if we move the _blocks_buffer output at last(behind 286 line),
// it's maybe eos but not output all data: when _blocks_buffer.empty() and _can_read = false (this: _sort_idx && _partition_sorts.size() are 0)
RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block,
output_block->columns()));
{
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
if (!get_data_from_blocks_buffer) {
// is_ready_for_read: this is set by sink node using: local_state._dependency->set_ready_for_read()
// notice: must output block from _blocks_buffer firstly, and then get_sorted_block.
// as when the child is eos, then set _can_read = true, and _partition_sorts have push_back sorter.
// if we move the _blocks_buffer output at last(behind 286 line),
// it's maybe eos but not output all data: when _blocks_buffer.empty() and _can_read = false (this: _sort_idx && _partition_sorts.size() are 0)
RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
{
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);

*eos = local_state._shared_state->blocks_buffer.empty() &&
local_state._sort_idx >= local_state._shared_state->partition_sorts.size();
*eos = local_state._shared_state->blocks_buffer.empty() &&
local_state._sort_idx >= local_state._shared_state->partition_sorts.size();
}
}

if (!output_block->empty()) {
//if buffer have no data and sink not eos, block reading and wait for signal again
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block,
output_block->columns()));
local_state._num_rows_returned += output_block->rows();
}
return Status::OK();
Expand Down
6 changes: 1 addition & 5 deletions be/src/vec/common/sort/partition_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Status PartitionSorter::prepare_for_read() {
auto& blocks = _state->get_sorted_block();
auto& priority_queue = _state->get_priority_queue();
for (auto& block : blocks) {
priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description));
priority_queue.emplace(MergeSortCursorImpl::create_shared(block, _sort_description));
}
blocks.clear();
return Status::OK();
Expand Down Expand Up @@ -102,10 +102,6 @@ Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {

Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
auto& priority_queue = _state->get_priority_queue();
if (priority_queue.empty()) {
*eos = true;
return Status::OK();
}
const auto& sorted_block = priority_queue.top().impl->block;
size_t num_columns = sorted_block->columns();
MutableBlock m_block =
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/common/sort/partition_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct SortCursorCmp {
}
return true;
}

int row = 0;
std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
};
Expand Down

0 comments on commit 073fd68

Please sign in to comment.