Skip to content

Commit

Permalink
[Sort] Support merge sort in ExchangeNode (apache#19)
Browse files Browse the repository at this point in the history
Change-Id: I5f15771153fe8e982c78c581c42d5b1e25ab543f

Co-authored-by: lihaopeng <[email protected]>
  • Loading branch information
HappenLee and lihaopeng committed Jul 1, 2021
1 parent 2d29502 commit ab37702
Show file tree
Hide file tree
Showing 9 changed files with 342 additions and 26 deletions.
1 change: 1 addition & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ set(VEC_FILES
runtime/vdata_stream_recvr.cpp
runtime/vdata_stream_mgr.cpp
runtime/vpartition_info.cpp
runtime/vsorted_run_merger.cpp
)

add_library(Vec STATIC
Expand Down
63 changes: 49 additions & 14 deletions be/src/vec/core/sort_cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "vec/core/column_numbers.h"
#include "vec/columns/column.h"
#include "vec/columns/column_string.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/runtime/vdata_stream_recvr.h"

namespace doris::vectorized
Expand Down Expand Up @@ -86,35 +87,69 @@ struct SortCursorImpl
: column_desc.column_number;
sort_columns.push_back(columns[column_number].get());

need_collation[j] = desc[j].collator != nullptr && typeid_cast<const ColumnString *>(sort_columns.back()); /// TODO Nullable(String)
has_collation |= need_collation[j];
// need_collation[j] = desc[j].collator != nullptr && typeid_cast<const ColumnString *>(sort_columns.back()); /// TODO Nullable(String)
// has_collation |= need_collation[j];
}

pos = 0;
rows = all_columns[0]->size();
}

bool isFirst() const { return pos == 0; }
virtual bool isLast() { return pos + 1 >= rows; }
bool isLast() { return pos + 1 >= rows; }
void next() { ++pos; }

virtual bool has_next_block() { return false; }
virtual Block* block_ptr() { return nullptr; }
};

using BlockSupplier = std::function<Status(Block **)>;

struct ReceiveQueueSortCursorImpl : public SortCursorImpl {
ReceiveQueueSortCursorImpl() = default;
VDataStreamRecvr::SenderQueue* _sender_queue;

bool isLast() override {
if (pos + 1 >= rows) {
Block* block_ptr;
auto status = _sender_queue->get_batch(&block_ptr);
if (status.ok() && block_ptr != nullptr) {
SortCursorImpl::reset(*block_ptr);
} else {
return true;
ReceiveQueueSortCursorImpl(const BlockSupplier& block_supplier, const std::vector<VExprContext*>& ordering_expr, const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first):
SortCursorImpl(), _ordering_expr(ordering_expr), _block_supplier(block_supplier){
sort_columns_size = ordering_expr.size();

desc.resize(ordering_expr.size());
for (int i = 0; i < desc.size(); i++) {
desc[i].direction = is_asc_order[i] ? 1 : -1;
desc[i].nulls_direction = nulls_first[i] ? 1 : -1;
}
has_next_block();
}

bool has_next_block() override {
auto status = _block_supplier(&_block_ptr);
if (status.ok() && _block_ptr != nullptr) {
for (int i = 0; i < desc.size(); ++i) {
_ordering_expr[i]->execute(_block_ptr, &desc[i].column_number);
}
SortCursorImpl::reset(*_block_ptr);
return true;
}
_block_ptr = nullptr;
return false;
}

Block* block_ptr() override {
return _block_ptr;
}

size_t columns_num() const { return all_columns.size(); }

Block create_empty_blocks() const {
size_t num_columns = columns_num();
MutableColumns columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
columns[i] = all_columns[i]->cloneEmpty();
return _block_ptr->cloneWithColumns(std::move(columns));
}

const std::vector<VExprContext*>& _ordering_expr;
Block* _block_ptr = nullptr;
BlockSupplier _block_supplier{};
bool _is_eof = false;
};

/// For easy copying.
Expand Down
9 changes: 8 additions & 1 deletion be/src/vec/exec/vexchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace doris::vectorized {
VExchangeNode::VExchangeNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
_num_senders(0),
_offset(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0),
_is_merging(tnode.exchange_node.__isset.sort_info),
_stream_recvr(nullptr) {}

Expand Down Expand Up @@ -43,7 +44,13 @@ Status VExchangeNode::prepare(RuntimeState* state) {
Status VExchangeNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
// TODO: sort

if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
RETURN_IF_ERROR(_stream_recvr->create_merger(_vsort_exec_exprs.lhs_ordering_expr_ctxs(),
_is_asc_order, _nulls_first, state->batch_size(), _limit, _offset));
}

return Status::OK();
}
Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/vexchange_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class VExchangeNode : public ExecNode {
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;

// use in merge sort
size_t _offset;
VSortExecExprs _vsort_exec_exprs;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/exec/vsort_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ Status VSortNode::open(RuntimeState* state) {
}

Status VSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
// return Status::NotSupported("Not Implemented VSortNode::get_next scalar");
// row_batch = nullptr;
row_batch = nullptr;
*eos = true;
return Status::OK();
return Status::NotSupported("Not Implemented VSortNode::get_next scalar");
}

Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) {
Expand Down
35 changes: 29 additions & 6 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
#include "gen_cpp/data.pb.h"
#include "runtime/mem_tracker.h"
#include "util/uid_util.h"

#include "vec/core/block.h"
#include "vec/core/sort_cursor.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/runtime/vsorted_run_merger.h"

namespace doris::vectorized {

Expand Down Expand Up @@ -219,6 +222,21 @@ VDataStreamRecvr::~VDataStreamRecvr() {
DCHECK(_mgr == nullptr) << "Must call close()";
}

Status VDataStreamRecvr::create_merger(const std::vector<VExprContext*>& ordering_expr, const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first, const size_t batch_size, int64_t limit, size_t offset) {
DCHECK(_is_merging);
vector<BlockSupplier> child_block_suppliers;
// Create the merger that will a single stream of sorted rows.
_merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit, offset, _profile));

for (int i = 0; i < _sender_queues.size(); ++i) {
child_block_suppliers.emplace_back(
std::bind(std::mem_fn(&SenderQueue::get_batch), _sender_queues[i], std::placeholders::_1));
}
RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
return Status::OK();
}

void VDataStreamRecvr::add_batch(const PBlock& pblock, int sender_id, int be_number,
int64_t packet_seq, ::google::protobuf::Closure** done) {
int use_sender_id = _is_merging ? sender_id : 0;
Expand All @@ -229,12 +247,18 @@ Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
// TODO: use merge
block->clear();
Block* res = nullptr;
RETURN_IF_ERROR(_sender_queues[0]->get_batch(&res));
if (res != nullptr) {
*block = *res;

if (!_is_merging) {
RETURN_IF_ERROR(_sender_queues[0]->get_batch(&res));
if (res != nullptr) {
*block = *res;
} else {
*eos = true;
}
} else {
*eos = true;
RETURN_IF_ERROR(_merger->get_next(block, eos));
}

return Status::OK();
}

Expand All @@ -258,8 +282,7 @@ void VDataStreamRecvr::close() {
_mgr->deregister_recvr(fragment_instance_id(), dest_node_id());
_mgr = nullptr;

// TODO:
// _merger.reset();
_merger.reset();
// TODO: Maybe shared tracker doesn't need to be reset manually
_mem_tracker.reset();
}
Expand Down
9 changes: 7 additions & 2 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class PBlock;
namespace vectorized {
class Block;
class VDataStreamMgr;
class VSortedRunMerger;
class VExprContext;

class VDataStreamRecvr {
public:
VDataStreamRecvr(VDataStreamMgr* stream_mgr, const std::shared_ptr<MemTracker>& parent_tracker,
Expand All @@ -36,7 +39,8 @@ class VDataStreamRecvr {

~VDataStreamRecvr();

// TODO: merger
Status create_merger(const std::vector<VExprContext*>& ordering_expr, const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first, const size_t batch_size, int64_t limit, size_t offset);

void add_batch(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done);
Expand Down Expand Up @@ -92,7 +96,8 @@ class VDataStreamRecvr {
std::shared_ptr<MemTracker> _mem_tracker;
std::vector<SenderQueue*> _sender_queues;

// TODO: declare merger
std::unique_ptr<VSortedRunMerger> _merger;

ObjectPool _sender_queue_pool;
RuntimeProfile* _profile;

Expand Down
147 changes: 147 additions & 0 deletions be/src/vec/runtime/vsorted_run_merger.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/runtime/vsorted_run_merger.h"

#include <vector>

#include "exprs/expr.h"
#include "runtime/descriptors.h"
#include "runtime/row_batch.h"
#include "runtime/sorter.h"
#include "runtime/tuple_row.h"
#include "util/debug_util.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"

using std::vector;

namespace doris::vectorized {

VSortedRunMerger::VSortedRunMerger(const std::vector<VExprContext *>& ordering_expr, const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first, const size_t batch_size, int64_t limit, size_t offset, RuntimeProfile* profile)
:_ordering_expr(ordering_expr), _is_asc_order(is_asc_order), _nulls_first(nulls_first), _batch_size(batch_size),
_limit(limit), _offset(offset){
_get_next_timer = ADD_TIMER(profile, "MergeGetNext");
_get_next_batch_timer = ADD_TIMER(profile, "MergeGetNextBatch");
}

Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs, bool parallel) {
for (const auto &supplier : input_runs) {
_cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, _nulls_first);
}

for (size_t i = 0; i < _cursors.size(); ++i) {
_priority_queue.push(SortCursor(&_cursors[i]));
}

for (const auto& cursor : _cursors) {
if (!cursor._is_eof) {
_empty_block = cursor.create_empty_blocks();
break;
}
}

return Status::OK();
}

Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
// Only have one receive data queue of data, no need to do merge and
// copy the data of block.
// return the data in receive data directly
if (_priority_queue.size() == 1) {
auto current = _priority_queue.top();
while (_offset != 0 && current->block_ptr() != nullptr) {
if (_offset >= current->rows - current->pos) {
_offset -= (current->rows - current->pos);
current->has_next_block();
} else {
current->pos += _offset;
_offset = 0;
}
}

if (current->isFirst()) {
if (current->block_ptr() != nullptr) {
current->block_ptr()->swap(*output_block);
*eos = !current->has_next_block();
} else {
*eos = true;
}
} else {
if (current->block_ptr() != nullptr) {
for (int i = 0; i < current->all_columns.size(); i++) {
auto& column_with_type = current->block_ptr()->getByPosition(i);
column_with_type.column = column_with_type.column->cut(current->pos,
current->rows - current->pos);
}
current->block_ptr()->swap(*output_block);
*eos = !current->has_next_block();
} else {
*eos = true;
}
}
} else {
size_t num_columns = _empty_block.columns();
MutableColumns merged_columns = _empty_block.cloneEmptyColumns();
/// TODO: reserve (in each column)

/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
while (!_priority_queue.empty()) {
auto current = _priority_queue.top();
_priority_queue.pop();

if (_offset > 0) {
_offset--;
} else {
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
++merged_rows;
}
next_heap(current);
if (merged_rows == _batch_size)
break;
}

if (merged_rows == 0) {
*eos = true;
return Status::OK();
}

Block merge_block = _empty_block.cloneWithColumns(std::move(merged_columns));
merge_block.swap(*output_block);
}

_num_rows_returned += output_block->rows();
if (_limit != -1 && _num_rows_returned >= _limit) {
output_block->set_num_rows(output_block->rows() - (_num_rows_returned - _limit));
*eos = true;
}
return Status::OK();
}

void VSortedRunMerger::next_heap(SortCursor& current) {
if (!current->isLast()) {
current->next();
_priority_queue.push(current);
} else if (current->has_next_block()) {
_priority_queue.push(current);
}
}

} // namespace doris
Loading

0 comments on commit ab37702

Please sign in to comment.