From 73a840aedf8feb77e6be442e3c191bb10a4a3d01 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Tue, 27 Apr 2021 20:39:32 +0800 Subject: [PATCH] Add profile to analyze the performance (#20) Change-Id: Ia7640db1556fc1255b8fe3e1134b8c8a80c72dd9 Co-authored-by: lihaopeng --- be/src/exec/olap_scan_node.cpp | 6 ++++++ be/src/exec/olap_scan_node.h | 6 ++++++ be/src/exec/olap_scanner.cpp | 1 + be/src/vec/exec/vexchange_node.cpp | 8 +++++++- be/src/vec/exec/volap_scan_node.cpp | 1 + be/src/vec/exec/volap_scanner.cpp | 1 + be/src/vec/exec/volap_scanner.h | 1 + be/src/vec/exprs/vcast_expr.cpp | 2 ++ be/src/vec/runtime/vdata_stream_recvr.cpp | 2 +- be/src/vec/runtime/vdata_stream_recvr.h | 3 +-- be/src/vec/runtime/vsorted_run_merger.cpp | 16 +++++++++++----- be/src/vec/runtime/vsorted_run_merger.h | 5 +++-- 12 files changed, 41 insertions(+), 11 deletions(-) diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 636a7e6277a6ff..742d05e4541359 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -157,6 +157,11 @@ void OlapScanNode::_init_counter(RuntimeState* state) { _scanner_wait_batch_timer = ADD_TIMER(_runtime_profile, "ScannerBatchWaitTime"); // time of scan thread to wait for worker thread of the thread pool _scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, "ScannerWorkerWaitTime"); + + // time of node to wait for batch/block queue + _olap_wait_batch_queue_timer = ADD_TIMER(_runtime_profile, "BatchQueueWaitTime"); + // time of row cursor to convert batch/block + _row_cursor_convert_timer = ADD_TIMER(_runtime_profile, "RowCursorConvertTime"); } Status OlapScanNode::prepare(RuntimeState* state) { @@ -286,6 +291,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo RowBatch* materialized_batch = NULL; { std::unique_lock l(_row_batches_lock); + SCOPED_TIMER(_olap_wait_batch_queue_timer); while (_materialized_row_batches.empty() && !_transfer_done) { if (state->is_cancelled()) { _transfer_done = true; diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 436b14e5140728..4b8931b6f8218b 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -35,6 +35,8 @@ #include "util/progress_updater.h" #include "util/spinlock.h" +#include "vec/exec/volap_scanner.h" + namespace doris { class IRuntimeFilter; @@ -201,6 +203,7 @@ class OlapScanNode : public ScanNode { RuntimeProfile* profile); friend class OlapScanner; + friend class doris::vectorized::VOlapScanner; // Tuple id resolved in prepare() to set _tuple_desc; TupleId _tuple_id; @@ -362,6 +365,9 @@ class OlapScanNode : public ScanNode { RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr; + + RuntimeProfile::Counter* _olap_wait_batch_queue_timer = nullptr; + RuntimeProfile::Counter* _row_cursor_convert_timer = nullptr; }; } // namespace doris diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index ebe2200755f007..3e9482b12a5e1c 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -424,6 +424,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { } void OlapScanner::_convert_row_to_tuple(Tuple* tuple) { + SCOPED_TIMER(_parent->_row_cursor_convert_timer); size_t slots_size = _query_slots.size(); for (int i = 0; i < slots_size; ++i) { SlotDescriptor* slot_desc = _query_slots[i]; diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index c42c2f5b7d83b5..6223885488f3d8 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -58,7 +58,13 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e } Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { - return _stream_recvr->get_next(block, eos); + SCOPED_TIMER(runtime_profile()->total_time_counter()); + auto status = _stream_recvr->get_next(block, eos); + if (block != nullptr) { + _num_rows_returned += block->rows(); + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + } + return status; } Status VExchangeNode::close(RuntimeState* state) { diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index d15ea901d52b8d..580b8a6182b315 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -459,6 +459,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { Block* materialized_block = NULL; { std::unique_lock l(_blocks_lock); + SCOPED_TIMER(_olap_wait_batch_queue_timer); while (_materialized_blocks.empty() && !_transfer_done) { if (state->is_cancelled()) { _transfer_done = true; diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index 1564d73bac7767..b8921e905e2cd9 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -99,6 +99,7 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo } void VOlapScanner::_convert_row_to_block(std::vector* columns) { + SCOPED_TIMER(_parent->_row_cursor_convert_timer); size_t slots_size = _query_slots.size(); for (int i = 0; i < slots_size; ++i) { SlotDescriptor* slot_desc = _query_slots[i]; diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h index 572d182def2723..0d6cb81509fad3 100644 --- a/be/src/vec/exec/volap_scanner.h +++ b/be/src/vec/exec/volap_scanner.h @@ -25,6 +25,7 @@ class OLAPReader; class RuntimeProfile; class Field; class RowBatch; + namespace vectorized { class VOlapScanNode; diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp index 90dca8bfea4bf7..aebcd7f3fc24d5 100644 --- a/be/src/vec/exprs/vcast_expr.cpp +++ b/be/src/vec/exprs/vcast_expr.cpp @@ -56,10 +56,12 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state, const doris::RowDes _expr_name = fmt::format("(CAST {}, TO {})", child_name, _target_data_type_name); return Status::OK(); } + doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context) { RETURN_IF_ERROR(VExpr::open(state, context)); return Status::OK(); } + void VCastExpr::close(doris::RuntimeState* state, VExprContext* context) { VExpr::close(state, context); } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index b1798dc383842b..487039746f2c62 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -223,7 +223,7 @@ VDataStreamRecvr::~VDataStreamRecvr() { } Status VDataStreamRecvr::create_merger(const std::vector& ordering_expr, const std::vector& is_asc_order, - const std::vector& nulls_first, const size_t batch_size, int64_t limit, size_t offset) { + const std::vector& nulls_first, size_t batch_size, int64_t limit, size_t offset) { DCHECK(_is_merging); vector child_block_suppliers; // Create the merger that will a single stream of sorted rows. diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 565819c3f7596e..e5dc3a097d8c0e 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -40,7 +40,7 @@ class VDataStreamRecvr { ~VDataStreamRecvr(); Status create_merger(const std::vector& ordering_expr, const std::vector& is_asc_order, - const std::vector& nulls_first, const size_t batch_size, int64_t limit, size_t offset); + const std::vector& nulls_first, size_t batch_size, int64_t limit, size_t offset); void add_batch(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done); @@ -72,7 +72,6 @@ class VDataStreamRecvr { return _num_buffered_bytes + batch_size > _total_buffer_limit; } - // TODO: // DataStreamMgr instance used to create this recvr. (Not owned) VDataStreamMgr* _mgr; diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index 690e1b91c1bf4e..6f8955bfc6c6d2 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -37,7 +37,7 @@ VSortedRunMerger::VSortedRunMerger(const std::vector& ordering_e :_ordering_expr(ordering_expr), _is_asc_order(is_asc_order), _nulls_first(nulls_first), _batch_size(batch_size), _limit(limit), _offset(offset){ _get_next_timer = ADD_TIMER(profile, "MergeGetNext"); - _get_next_batch_timer = ADD_TIMER(profile, "MergeGetNextBatch"); + _get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock"); } Status VSortedRunMerger::prepare(const vector& input_runs, bool parallel) { @@ -60,6 +60,7 @@ Status VSortedRunMerger::prepare(const vector& input_runs, bool p } Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { + ScopedTimer timer(_get_next_timer); // Only have one receive data queue of data, no need to do merge and // copy the data of block. // return the data in receive data directly @@ -68,7 +69,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { while (_offset != 0 && current->block_ptr() != nullptr) { if (_offset >= current->rows - current->pos) { _offset -= (current->rows - current->pos); - current->has_next_block(); + has_next_block(current); } else { current->pos += _offset; _offset = 0; @@ -78,7 +79,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { if (current->isFirst()) { if (current->block_ptr() != nullptr) { current->block_ptr()->swap(*output_block); - *eos = !current->has_next_block(); + *eos = !has_next_block(current); } else { *eos = true; } @@ -90,7 +91,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { current->rows - current->pos); } current->block_ptr()->swap(*output_block); - *eos = !current->has_next_block(); + *eos = !has_next_block(current); } else { *eos = true; } @@ -139,9 +140,14 @@ void VSortedRunMerger::next_heap(SortCursor& current) { if (!current->isLast()) { current->next(); _priority_queue.push(current); - } else if (current->has_next_block()) { + } else if (has_next_block(current)) { _priority_queue.push(current); } } +inline bool VSortedRunMerger::has_next_block(doris::vectorized::SortCursor ¤t) { + ScopedTimer timer(_get_next_block_timer); + return current->has_next_block(); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 8f49ff4c8a72ce..4d1942f0b5a0a3 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -44,7 +44,7 @@ class Block; // an input batch is processed. class VSortedRunMerger { public: - // Function that returns the next batch of rows from an input sorted run. The batch + // Function that returns the next block of rows from an input sorted run. The batch // is owned by the supplier (i.e. not VSortedRunMerger). eos is indicated by an NULL // batch being returned. VSortedRunMerger(const std::vector& ordering_expr, const std::vector& _is_asc_order, @@ -87,10 +87,11 @@ class VSortedRunMerger { RuntimeProfile::Counter *_get_next_timer; // Times calls to get the next batch of rows from the input run. - RuntimeProfile::Counter *_get_next_batch_timer; + RuntimeProfile::Counter *_get_next_block_timer; private: void next_heap(SortCursor& current); + inline bool has_next_block(SortCursor& current); }; }