Skip to content

Commit

Permalink
Add profile to analyze the performance (apache#20)
Browse files Browse the repository at this point in the history
Change-Id: Ia7640db1556fc1255b8fe3e1134b8c8a80c72dd9

Co-authored-by: lihaopeng <[email protected]>
  • Loading branch information
HappenLee and lihaopeng committed Aug 10, 2021
1 parent db90111 commit 73a840a
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 11 deletions.
6 changes: 6 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ void OlapScanNode::_init_counter(RuntimeState* state) {
_scanner_wait_batch_timer = ADD_TIMER(_runtime_profile, "ScannerBatchWaitTime");
// time of scan thread to wait for worker thread of the thread pool
_scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, "ScannerWorkerWaitTime");

// time of node to wait for batch/block queue
_olap_wait_batch_queue_timer = ADD_TIMER(_runtime_profile, "BatchQueueWaitTime");
// time of row cursor to convert batch/block
_row_cursor_convert_timer = ADD_TIMER(_runtime_profile, "RowCursorConvertTime");
}

Status OlapScanNode::prepare(RuntimeState* state) {
Expand Down Expand Up @@ -286,6 +291,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
RowBatch* materialized_batch = NULL;
{
std::unique_lock<std::mutex> l(_row_batches_lock);
SCOPED_TIMER(_olap_wait_batch_queue_timer);
while (_materialized_row_batches.empty() && !_transfer_done) {
if (state->is_cancelled()) {
_transfer_done = true;
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "util/progress_updater.h"
#include "util/spinlock.h"

#include "vec/exec/volap_scanner.h"

namespace doris {
class IRuntimeFilter;

Expand Down Expand Up @@ -201,6 +203,7 @@ class OlapScanNode : public ScanNode {
RuntimeProfile* profile);

friend class OlapScanner;
friend class doris::vectorized::VOlapScanner;

// Tuple id resolved in prepare() to set _tuple_desc;
TupleId _tuple_id;
Expand Down Expand Up @@ -362,6 +365,9 @@ class OlapScanNode : public ScanNode {

RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;

RuntimeProfile::Counter* _olap_wait_batch_queue_timer = nullptr;
RuntimeProfile::Counter* _row_cursor_convert_timer = nullptr;
};

} // namespace doris
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
}

void OlapScanner::_convert_row_to_tuple(Tuple* tuple) {
SCOPED_TIMER(_parent->_row_cursor_convert_timer);
size_t slots_size = _query_slots.size();
for (int i = 0; i < slots_size; ++i) {
SlotDescriptor* slot_desc = _query_slots[i];
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/exec/vexchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e
}

Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
return _stream_recvr->get_next(block, eos);
SCOPED_TIMER(runtime_profile()->total_time_counter());
auto status = _stream_recvr->get_next(block, eos);
if (block != nullptr) {
_num_rows_returned += block->rows();
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
return status;
}

Status VExchangeNode::close(RuntimeState* state) {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/volap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
Block* materialized_block = NULL;
{
std::unique_lock<std::mutex> l(_blocks_lock);
SCOPED_TIMER(_olap_wait_batch_queue_timer);
while (_materialized_blocks.empty() && !_transfer_done) {
if (state->is_cancelled()) {
_transfer_done = true;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/volap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo
}

void VOlapScanner::_convert_row_to_block(std::vector<vectorized::MutableColumnPtr>* columns) {
SCOPED_TIMER(_parent->_row_cursor_convert_timer);
size_t slots_size = _query_slots.size();
for (int i = 0; i < slots_size; ++i) {
SlotDescriptor* slot_desc = _query_slots[i];
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/volap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class OLAPReader;
class RuntimeProfile;
class Field;
class RowBatch;

namespace vectorized {
class VOlapScanNode;

Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exprs/vcast_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state, const doris::RowDes
_expr_name = fmt::format("(CAST {}, TO {})", child_name, _target_data_type_name);
return Status::OK();
}

doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context) {
RETURN_IF_ERROR(VExpr::open(state, context));
return Status::OK();
}

void VCastExpr::close(doris::RuntimeState* state, VExprContext* context) {
VExpr::close(state, context);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ VDataStreamRecvr::~VDataStreamRecvr() {
}

Status VDataStreamRecvr::create_merger(const std::vector<VExprContext*>& ordering_expr, const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first, const size_t batch_size, int64_t limit, size_t offset) {
const std::vector<bool>& nulls_first, size_t batch_size, int64_t limit, size_t offset) {
DCHECK(_is_merging);
vector<BlockSupplier> child_block_suppliers;
// Create the merger that will a single stream of sorted rows.
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class VDataStreamRecvr {
~VDataStreamRecvr();

Status create_merger(const std::vector<VExprContext*>& ordering_expr, const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first, const size_t batch_size, int64_t limit, size_t offset);
const std::vector<bool>& nulls_first, size_t batch_size, int64_t limit, size_t offset);

void add_batch(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done);
Expand Down Expand Up @@ -72,7 +72,6 @@ class VDataStreamRecvr {
return _num_buffered_bytes + batch_size > _total_buffer_limit;
}

// TODO:
// DataStreamMgr instance used to create this recvr. (Not owned)
VDataStreamMgr* _mgr;

Expand Down
16 changes: 11 additions & 5 deletions be/src/vec/runtime/vsorted_run_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ VSortedRunMerger::VSortedRunMerger(const std::vector<VExprContext *>& ordering_e
:_ordering_expr(ordering_expr), _is_asc_order(is_asc_order), _nulls_first(nulls_first), _batch_size(batch_size),
_limit(limit), _offset(offset){
_get_next_timer = ADD_TIMER(profile, "MergeGetNext");
_get_next_batch_timer = ADD_TIMER(profile, "MergeGetNextBatch");
_get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock");
}

Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs, bool parallel) {
Expand All @@ -60,6 +60,7 @@ Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs, bool p
}

Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
ScopedTimer<MonotonicStopWatch> timer(_get_next_timer);
// Only have one receive data queue of data, no need to do merge and
// copy the data of block.
// return the data in receive data directly
Expand All @@ -68,7 +69,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
while (_offset != 0 && current->block_ptr() != nullptr) {
if (_offset >= current->rows - current->pos) {
_offset -= (current->rows - current->pos);
current->has_next_block();
has_next_block(current);
} else {
current->pos += _offset;
_offset = 0;
Expand All @@ -78,7 +79,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
if (current->isFirst()) {
if (current->block_ptr() != nullptr) {
current->block_ptr()->swap(*output_block);
*eos = !current->has_next_block();
*eos = !has_next_block(current);
} else {
*eos = true;
}
Expand All @@ -90,7 +91,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
current->rows - current->pos);
}
current->block_ptr()->swap(*output_block);
*eos = !current->has_next_block();
*eos = !has_next_block(current);
} else {
*eos = true;
}
Expand Down Expand Up @@ -139,9 +140,14 @@ void VSortedRunMerger::next_heap(SortCursor& current) {
if (!current->isLast()) {
current->next();
_priority_queue.push(current);
} else if (current->has_next_block()) {
} else if (has_next_block(current)) {
_priority_queue.push(current);
}
}

inline bool VSortedRunMerger::has_next_block(doris::vectorized::SortCursor &current) {
ScopedTimer<MonotonicStopWatch> timer(_get_next_block_timer);
return current->has_next_block();
}

} // namespace doris
5 changes: 3 additions & 2 deletions be/src/vec/runtime/vsorted_run_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Block;
// an input batch is processed.
class VSortedRunMerger {
public:
// Function that returns the next batch of rows from an input sorted run. The batch
// Function that returns the next block of rows from an input sorted run. The batch
// is owned by the supplier (i.e. not VSortedRunMerger). eos is indicated by an NULL
// batch being returned.
VSortedRunMerger(const std::vector<VExprContext *>& ordering_expr, const std::vector<bool>& _is_asc_order,
Expand Down Expand Up @@ -87,10 +87,11 @@ class VSortedRunMerger {
RuntimeProfile::Counter *_get_next_timer;

// Times calls to get the next batch of rows from the input run.
RuntimeProfile::Counter *_get_next_batch_timer;
RuntimeProfile::Counter *_get_next_block_timer;

private:
void next_heap(SortCursor& current);
inline bool has_next_block(SortCursor& current);
};

}
Expand Down

0 comments on commit 73a840a

Please sign in to comment.