diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index 420e9c949b3..bf58fc31b38 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -120,7 +120,7 @@ void PipelineExecutor::cancel() String PipelineExecutor::toString() const { assert(root_pipeline); - return root_pipeline->toTreeString(); + return fmt::format("query concurrency: {}\n{}", context.getMaxStreams(), root_pipeline->toTreeString()); } int PipelineExecutor::estimateNewThreadCount() diff --git a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h index 99d45789cc7..ac44486476b 100644 --- a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h +++ b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h @@ -14,6 +14,7 @@ #pragma once +#include #include namespace DB diff --git a/dbms/src/Flash/Statistics/ExecutorStatistics.h b/dbms/src/Flash/Statistics/ExecutorStatistics.h index 6c5c8f5921e..e5f1e616d38 100644 --- a/dbms/src/Flash/Statistics/ExecutorStatistics.h +++ b/dbms/src/Flash/Statistics/ExecutorStatistics.h @@ -58,11 +58,12 @@ class ExecutorStatistics : public ExecutorStatisticsBase [](const String & child, FmtBuffer & bf) { bf.fmtAppend(R"("{}")", child); }, ","); fmt_buffer.fmtAppend( - R"(],"outbound_rows":{},"outbound_blocks":{},"outbound_bytes":{},"outbound_allocated_bytes":{},"execution_time_ns":{})", + R"(],"outbound_rows":{},"outbound_blocks":{},"outbound_bytes":{},"outbound_allocated_bytes":{},"concurrency":{},"execution_time_ns":{})", base.rows, base.blocks, base.bytes, base.allocated_bytes, + base.concurrency, base.execution_time_ns); if constexpr (ExecutorImpl::has_extra_info) { diff --git a/dbms/src/Flash/Statistics/JoinImpl.cpp b/dbms/src/Flash/Statistics/JoinImpl.cpp index 190802bbb5b..4f97dc428cc 100644 --- a/dbms/src/Flash/Statistics/JoinImpl.cpp +++ b/dbms/src/Flash/Statistics/JoinImpl.cpp @@ -20,8 +20,9 @@ namespace DB void JoinStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const { fmt_buffer.fmtAppend( - R"("peak_build_bytes_usage":{},"build_side_child":"{}","is_spill_enabled":{},"is_spilled":{})" - R"("join_build_inbound_rows":{},"join_build_inbound_blocks":{},"join_build_inbound_bytes":{},"join_build_inbound_allocated_bytes":{}, "join_build_execution_time_ns":{})", + R"("peak_build_bytes_usage":{},"build_side_child":"{}","is_spill_enabled":{},"is_spilled":{},)" + R"("join_build_inbound_rows":{},"join_build_inbound_blocks":{},"join_build_inbound_bytes":{},)" + R"("join_build_inbound_allocated_bytes":{},"join_build_concurrency":{},"join_build_execution_time_ns":{})", peak_build_bytes_usage, build_side_child, is_spill_enabled, @@ -30,6 +31,7 @@ void JoinStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const join_build_base.blocks, join_build_base.bytes, join_build_base.allocated_bytes, + join_build_base.concurrency, join_build_base.execution_time_ns); } diff --git a/dbms/src/Flash/tests/gtest_pipeline_interpreter.out b/dbms/src/Flash/tests/gtest_pipeline_interpreter.out index 6644946f072..e3b45df476d 100644 --- a/dbms/src/Flash/tests/gtest_pipeline_interpreter.out +++ b/dbms/src/Flash/tests/gtest_pipeline_interpreter.out @@ -1,119 +1,141 @@ ~test_suite_name: SimplePipeline ~result_index: 0 ~result: +query concurrency: 1 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Filter|selection_1 -> Projection|project_2 -> Limit|limit_3 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_4 @ ~test_suite_name: SimplePipeline ~result_index: 1 ~result: +query concurrency: 1 pipeline#0: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator @ ~test_suite_name: StrangeQuery ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> Filter|selection_1 -> Filter|selection_2 -> Filter|selection_3 -> Projection|NonTiDBOperator @ ~test_suite_name: StrangeQuery ~result_index: 1 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Limit|limit_2 -> Limit|limit_3 -> Projection|NonTiDBOperator @ ~test_suite_name: StrangeQuery ~result_index: 2 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> TopN|topn_2 -> TopN|topn_3 -> Projection|NonTiDBOperator @ ~test_suite_name: SingleQueryBlock ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_2 -> Filter|selection_3 -> TopN|topn_4 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> Filter|selection_1 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: SingleQueryBlock ~result_index: 1 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_2 -> Filter|selection_3 -> Limit|limit_4 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> Filter|selection_1 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 0 ~result: +query concurrency: 1 pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 1 ~result: +query concurrency: 5 pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 2 ~result: +query concurrency: 1 pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> Projection|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 3 ~result: +query concurrency: 5 pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> Projection|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 4 ~result: +query concurrency: 1 pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 5 ~result: +query concurrency: 5 pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 6 ~result: +query concurrency: 1 pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 7 ~result: +query concurrency: 5 pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 8 ~result: +query concurrency: 1 pipeline#0: MockTableScan|table_scan_0 -> Filter|selection_1 -> Projection|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 9 ~result: +query concurrency: 5 pipeline#0: MockTableScan|table_scan_0 -> Filter|selection_1 -> Projection|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 10 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 11 ~result: +query concurrency: 1 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 12 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 13 ~result: +query concurrency: 1 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 14 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator |- pipeline#1: AggregationConvergent|aggregation_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator |- pipeline#2: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator @@ -121,6 +143,7 @@ pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator ~test_suite_name: ParallelQuery ~result_index: 15 ~result: +query concurrency: 1 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator |- pipeline#1: AggregationConvergent|aggregation_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator |- pipeline#2: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator @@ -128,134 +151,159 @@ pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator ~test_suite_name: ParallelQuery ~result_index: 16 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2 |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 17 ~result: +query concurrency: 1 pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2 |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ParallelQuery ~result_index: 18 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2 @ ~test_suite_name: ParallelQuery ~result_index: 19 ~result: +query concurrency: 1 pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2 @ ~test_suite_name: ParallelQuery ~result_index: 20 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2 @ ~test_suite_name: ParallelQuery ~result_index: 21 ~result: +query concurrency: 1 pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2 @ ~test_suite_name: ParallelQuery ~result_index: 22 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_3 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_1 -> Limit|limit_2 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator @ ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> Projection|project_2 -> Projection|project_3 -> Projection|NonTiDBOperator @ ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 1 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> Projection|NonTiDBOperator @ ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 2 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_4 -> Projection|project_5 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 3 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_4 -> Projection|project_5 -> Filter|selection_6 -> Projection|project_7 -> Limit|limit_8 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 4 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|project_1 -> Projection|project_2 -> Projection|project_3 -> Projection|NonTiDBOperator @ ~test_suite_name: MultipleQueryBlockWithSource ~result_index: 5 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|project_1 -> Projection|project_2 -> Projection|project_3 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_4 @ ~test_suite_name: Window ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> WindowSort|sort_1 -> Window|window_2 -> Projection|NonTiDBOperator @ ~test_suite_name: Window ~result_index: 1 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> WindowSort|sort_1 -> Window|window_2 -> Projection|project_3 -> Projection|NonTiDBOperator @ ~test_suite_name: Window ~result_index: 2 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> WindowSort|sort_1 -> Projection|project_2 -> Window|window_3 -> Projection|project_4 -> Projection|NonTiDBOperator @ ~test_suite_name: FineGrainedShuffle ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> WindowSort|sort_1 -> Window|window_2 -> Projection|NonTiDBOperator @ ~test_suite_name: FineGrainedShuffle ~result_index: 1 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> TopN|topn_1 -> Projection|NonTiDBOperator @ ~test_suite_name: FineGrainedShuffle ~result_index: 2 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> WindowSort|sort_1 -> Window|window_2 -> Projection|NonTiDBOperator @ ~test_suite_name: FineGrainedShuffle ~result_index: 3 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> TopN|topn_1 -> Projection|NonTiDBOperator @ ~test_suite_name: FineGrainedShuffleJoin ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_2 -> Projection|NonTiDBOperator |- pipeline#1: MockExchangeReceiver|exchange_receiver_1 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator @ ~test_suite_name: FineGrainedShuffleJoin ~result_index: 1 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_2 -> Projection|NonTiDBOperator |- pipeline#1: MockExchangeReceiver|exchange_receiver_1 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator @ ~test_suite_name: FineGrainedShuffleAgg ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Aggregation|aggregation_1 -> Projection|NonTiDBOperator @ ~test_suite_name: FineGrainedShuffleAgg ~result_index: 1 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator |- pipeline#1: MockExchangeReceiver|exchange_receiver_0 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: Join ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_6 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_1 -> Projection|NonTiDBOperator -> JoinProbe|Join_5 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator |- pipeline#2: MockTableScan|table_scan_2 -> Projection|NonTiDBOperator -> JoinProbe|Join_4 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator @@ -264,6 +312,7 @@ pipeline#0: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator -> JoinProb ~test_suite_name: Join ~result_index: 1 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_6 -> Projection|NonTiDBOperator |- pipeline#1: MockExchangeReceiver|exchange_receiver_1 -> Projection|NonTiDBOperator -> JoinProbe|Join_5 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator |- pipeline#2: MockExchangeReceiver|exchange_receiver_2 -> Projection|NonTiDBOperator -> JoinProbe|Join_4 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator @@ -272,6 +321,7 @@ pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperat ~test_suite_name: Join ~result_index: 2 ~result: +query concurrency: 10 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_6 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_7 |- pipeline#1: MockExchangeReceiver|exchange_receiver_1 -> Projection|NonTiDBOperator -> JoinProbe|Join_5 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator |- pipeline#2: MockExchangeReceiver|exchange_receiver_2 -> Projection|NonTiDBOperator -> JoinProbe|Join_4 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator @@ -280,6 +330,7 @@ pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperat ~test_suite_name: JoinThenAgg ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_2 -> AggregationBuild|NonTiDBOperator |- pipeline#2: MockTableScan|table_scan_1 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator @@ -287,6 +338,7 @@ pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator ~test_suite_name: JoinThenAgg ~result_index: 1 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_2 -> AggregationBuild|NonTiDBOperator |- pipeline#2: MockTableScan|table_scan_1 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator @@ -294,6 +346,7 @@ pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator ~test_suite_name: JoinThenAgg ~result_index: 2 ~result: +query concurrency: 20 pipeline#0: AggregationConvergent|aggregation_3 -> Limit|limit_4 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_5 |- pipeline#1: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_2 -> AggregationBuild|NonTiDBOperator |- pipeline#2: MockExchangeReceiver|exchange_receiver_1 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator @@ -301,24 +354,28 @@ pipeline#0: AggregationConvergent|aggregation_3 -> Limit|limit_4 -> Projection|N ~test_suite_name: ListBase ~result_index: 0 ~result: +query concurrency: 1 pipeline#0: AggregationConvergent|aggregation_2 -> Filter|selection_3 -> Limit|limit_4 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> Filter|selection_1 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ListBase ~result_index: 1 ~result: +query concurrency: 20 pipeline#0: AggregationConvergent|aggregation_2 -> Filter|selection_3 -> TopN|topn_4 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_0 -> Filter|selection_1 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: ExpandPlan ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_1 -> Expand|expand_2 -> Projection|NonTiDBOperator -> JoinProbe|Join_5 -> Projection|project_6 -> TopN|topn_7 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_3 -> Projection|project_4 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator |- pipeline#2: MockExchangeReceiver|exchange_receiver_0 -> AggregationBuild|NonTiDBOperator @ ~test_suite_name: Expand2Plan ~result_index: 0 ~result: +query concurrency: 10 pipeline#0: AggregationConvergent|aggregation_1 -> Filter|expand_2 -> Projection|NonTiDBOperator -> JoinProbe|Join_5 -> Projection|project_6 -> TopN|topn_7 -> Projection|NonTiDBOperator |- pipeline#1: MockTableScan|table_scan_3 -> Projection|project_4 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator |- pipeline#2: MockExchangeReceiver|exchange_receiver_0 -> AggregationBuild|NonTiDBOperator @