Skip to content

Commit

Permalink
improve dag execution time collection (#202)
Browse files Browse the repository at this point in the history
* improve dag execution time collection

* address comment

* update comments

* update comment

* update comment
  • Loading branch information
windtalker authored and zanmato1984 committed Aug 26, 2019
1 parent e8b4198 commit 61cdc8f
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 8 deletions.
7 changes: 7 additions & 0 deletions dbms/src/DataStreams/BlockStreamProfileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ struct BlockStreamProfileInfo
size_t rows = 0;
size_t blocks = 0;
size_t bytes = 0;
// execution time is the total time spent on current stream and all its children streams
// note that it is different from total_stopwatch.elapsed(), which includes not only the
// time spent on current stream and all its children streams, but also the time of its
// parent streams
UInt64 execution_time = 0;

using BlockStreamProfileInfos = std::vector<const BlockStreamProfileInfo *>;

Expand All @@ -45,6 +50,8 @@ struct BlockStreamProfileInfo

void update(Block & block);

void updateExecutionTime(UInt64 time) { execution_time += time; }

/// Binary serialization and deserialization of main fields.
/// Writes only main fields i.e. fields that required by internal transmission protocol.
void read(ReadBuffer & in);
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/DataStreams/IProfilingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Block IProfilingBlockInputStream::read()
if (isCancelledOrThrowIfKilled())
return res;

auto start_time = info.total_stopwatch.elapsed();

if (!checkTimeLimit())
limit_exceeded_need_break = true;

Expand Down Expand Up @@ -83,6 +85,7 @@ Block IProfilingBlockInputStream::read()
}
#endif

info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
return res;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ try
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(streamPtr.get()))
{
time_processed_ns = std::max(time_processed_ns, p_stream->getProfileInfo().total_stopwatch.elapsed());
time_processed_ns = std::max(time_processed_ns, p_stream->getProfileInfo().execution_time);
num_produced_rows += p_stream->getProfileInfo().rows;
num_iterations += p_stream->getProfileInfo().blocks;
}
Expand Down
8 changes: 1 addition & 7 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,8 @@ void InterpreterDAG::executeAggregation(
}
else
{
BlockInputStreams inputs;
if (!pipeline.streams.empty())
inputs.push_back(pipeline.firstStream());
else
pipeline.streams.resize(1);

pipeline.firstStream()
= std::make_shared<AggregatingBlockInputStream>(std::make_shared<ConcatBlockInputStream>(inputs), params, true);
= std::make_shared<AggregatingBlockInputStream>(pipeline.firstStream(), params, true);
}
// add cast
}
Expand Down

0 comments on commit 61cdc8f

Please sign in to comment.