diff --git a/dbms/src/DataStreams/BlockStreamProfileInfo.h b/dbms/src/DataStreams/BlockStreamProfileInfo.h index 578b0ed5b26..e546dd20e2a 100644 --- a/dbms/src/DataStreams/BlockStreamProfileInfo.h +++ b/dbms/src/DataStreams/BlockStreamProfileInfo.h @@ -29,6 +29,11 @@ struct BlockStreamProfileInfo size_t rows = 0; size_t blocks = 0; size_t bytes = 0; + // execution time is the total time spent on current stream and all its children streams + // note that it is different from total_stopwatch.elapsed(), which includes not only the + // time spent on current stream and all its children streams, but also the time of its + // parent streams + UInt64 execution_time = 0; using BlockStreamProfileInfos = std::vector; @@ -45,6 +50,8 @@ struct BlockStreamProfileInfo void update(Block & block); + void updateExecutionTime(UInt64 time) { execution_time += time; } + /// Binary serialization and deserialization of main fields. /// Writes only main fields i.e. fields that required by internal transmission protocol. void read(ReadBuffer & in); diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index 09eeff2225c..09bf609833f 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -42,6 +42,8 @@ Block IProfilingBlockInputStream::read() if (isCancelledOrThrowIfKilled()) return res; + auto start_time = info.total_stopwatch.elapsed(); + if (!checkTimeLimit()) limit_exceeded_need_break = true; @@ -83,6 +85,7 @@ Block IProfilingBlockInputStream::read() } #endif + info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time); return res; } diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index b5f72738ab0..034139706ef 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -78,7 +78,7 @@ try { if (auto * p_stream = dynamic_cast(streamPtr.get())) { - time_processed_ns = std::max(time_processed_ns, p_stream->getProfileInfo().total_stopwatch.elapsed()); + time_processed_ns = std::max(time_processed_ns, p_stream->getProfileInfo().execution_time); num_produced_rows += p_stream->getProfileInfo().rows; num_iterations += p_stream->getProfileInfo().blocks; } diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index ba6f9e67748..64d39ed3728 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -269,14 +269,8 @@ void InterpreterDAG::executeAggregation( } else { - BlockInputStreams inputs; - if (!pipeline.streams.empty()) - inputs.push_back(pipeline.firstStream()); - else - pipeline.streams.resize(1); - pipeline.firstStream() - = std::make_shared(std::make_shared(inputs), params, true); + = std::make_shared(pipeline.firstStream(), params, true); } // add cast }