From 81059d60754eb8f86d6c62ccd8fc6db2d74eb1eb Mon Sep 17 00:00:00 2001
From: SeaRise <hhssearise@foxmail.com>
Date: Wed, 18 Oct 2023 18:45:29 +0800
Subject: [PATCH] Add some logs about concurrency (#8214)

close pingcap/tiflash#8215
---
 dbms/src/Flash/Executor/PipelineExecutor.cpp  |  2 +-
 .../Flash/Statistics/BaseRuntimeStatistics.h  |  1 +
 .../src/Flash/Statistics/ExecutorStatistics.h |  3 +-
 dbms/src/Flash/Statistics/JoinImpl.cpp        |  6 +-
 .../tests/gtest_pipeline_interpreter.out      | 57 +++++++++++++++++++
 5 files changed, 65 insertions(+), 4 deletions(-)

diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp
index 420e9c949b3..bf58fc31b38 100644
--- a/dbms/src/Flash/Executor/PipelineExecutor.cpp
+++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp
@@ -120,7 +120,7 @@ void PipelineExecutor::cancel()
 String PipelineExecutor::toString() const
 {
     assert(root_pipeline);
-    return root_pipeline->toTreeString();
+    return fmt::format("query concurrency: {}\n{}", context.getMaxStreams(), root_pipeline->toTreeString());
 }
 
 int PipelineExecutor::estimateNewThreadCount()
diff --git a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h
index 99d45789cc7..ac44486476b 100644
--- a/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h
+++ b/dbms/src/Flash/Statistics/BaseRuntimeStatistics.h
@@ -14,6 +14,7 @@
 
 #pragma once
 
+#include <Common/FmtUtils.h>
 #include <common/types.h>
 
 namespace DB
diff --git a/dbms/src/Flash/Statistics/ExecutorStatistics.h b/dbms/src/Flash/Statistics/ExecutorStatistics.h
index 6c5c8f5921e..e5f1e616d38 100644
--- a/dbms/src/Flash/Statistics/ExecutorStatistics.h
+++ b/dbms/src/Flash/Statistics/ExecutorStatistics.h
@@ -58,11 +58,12 @@ class ExecutorStatistics : public ExecutorStatisticsBase
             [](const String & child, FmtBuffer & bf) { bf.fmtAppend(R"("{}")", child); },
             ",");
         fmt_buffer.fmtAppend(
-            R"(],"outbound_rows":{},"outbound_blocks":{},"outbound_bytes":{},"outbound_allocated_bytes":{},"execution_time_ns":{})",
+            R"(],"outbound_rows":{},"outbound_blocks":{},"outbound_bytes":{},"outbound_allocated_bytes":{},"concurrency":{},"execution_time_ns":{})",
             base.rows,
             base.blocks,
             base.bytes,
             base.allocated_bytes,
+            base.concurrency,
             base.execution_time_ns);
         if constexpr (ExecutorImpl::has_extra_info)
         {
diff --git a/dbms/src/Flash/Statistics/JoinImpl.cpp b/dbms/src/Flash/Statistics/JoinImpl.cpp
index 190802bbb5b..4f97dc428cc 100644
--- a/dbms/src/Flash/Statistics/JoinImpl.cpp
+++ b/dbms/src/Flash/Statistics/JoinImpl.cpp
@@ -20,8 +20,9 @@ namespace DB
 void JoinStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const
 {
     fmt_buffer.fmtAppend(
-        R"("peak_build_bytes_usage":{},"build_side_child":"{}","is_spill_enabled":{},"is_spilled":{})"
-        R"("join_build_inbound_rows":{},"join_build_inbound_blocks":{},"join_build_inbound_bytes":{},"join_build_inbound_allocated_bytes":{}, "join_build_execution_time_ns":{})",
+        R"("peak_build_bytes_usage":{},"build_side_child":"{}","is_spill_enabled":{},"is_spilled":{},)"
+        R"("join_build_inbound_rows":{},"join_build_inbound_blocks":{},"join_build_inbound_bytes":{},)"
+        R"("join_build_inbound_allocated_bytes":{},"join_build_concurrency":{},"join_build_execution_time_ns":{})",
         peak_build_bytes_usage,
         build_side_child,
         is_spill_enabled,
@@ -30,6 +31,7 @@ void JoinStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const
         join_build_base.blocks,
         join_build_base.bytes,
         join_build_base.allocated_bytes,
+        join_build_base.concurrency,
         join_build_base.execution_time_ns);
 }
 
diff --git a/dbms/src/Flash/tests/gtest_pipeline_interpreter.out b/dbms/src/Flash/tests/gtest_pipeline_interpreter.out
index 6644946f072..e3b45df476d 100644
--- a/dbms/src/Flash/tests/gtest_pipeline_interpreter.out
+++ b/dbms/src/Flash/tests/gtest_pipeline_interpreter.out
@@ -1,119 +1,141 @@
 ~test_suite_name: SimplePipeline
 ~result_index: 0
 ~result:
+query concurrency: 1
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Filter|selection_1 -> Projection|project_2 -> Limit|limit_3 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_4
 @
 ~test_suite_name: SimplePipeline
 ~result_index: 1
 ~result:
+query concurrency: 1
 pipeline#0: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: StrangeQuery
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> Filter|selection_1 -> Filter|selection_2 -> Filter|selection_3 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: StrangeQuery
 ~result_index: 1
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Limit|limit_2 -> Limit|limit_3 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: StrangeQuery
 ~result_index: 2
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> TopN|topn_2 -> TopN|topn_3 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: SingleQueryBlock
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_2 -> Filter|selection_3 -> TopN|topn_4 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> Filter|selection_1 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: SingleQueryBlock
 ~result_index: 1
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_2 -> Filter|selection_3 -> Limit|limit_4 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> Filter|selection_1 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 0
 ~result:
+query concurrency: 1
 pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 1
 ~result:
+query concurrency: 5
 pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 2
 ~result:
+query concurrency: 1
 pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 3
 ~result:
+query concurrency: 5
 pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 4
 ~result:
+query concurrency: 1
 pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 5
 ~result:
+query concurrency: 5
 pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 6
 ~result:
+query concurrency: 1
 pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 7
 ~result:
+query concurrency: 5
 pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 8
 ~result:
+query concurrency: 1
 pipeline#0: MockTableScan|table_scan_0 -> Filter|selection_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 9
 ~result:
+query concurrency: 5
 pipeline#0: MockTableScan|table_scan_0 -> Filter|selection_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 10
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 11
 ~result:
+query concurrency: 1
 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 12
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 13
 ~result:
+query concurrency: 1
 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 14
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
  |- pipeline#1: AggregationConvergent|aggregation_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator
   |- pipeline#2: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator
@@ -121,6 +143,7 @@ pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
 ~test_suite_name: ParallelQuery
 ~result_index: 15
 ~result:
+query concurrency: 1
 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
  |- pipeline#1: AggregationConvergent|aggregation_1 -> Projection|project_2 -> AggregationBuild|NonTiDBOperator
   |- pipeline#2: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator
@@ -128,134 +151,159 @@ pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
 ~test_suite_name: ParallelQuery
 ~result_index: 16
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2
  |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 17
 ~result:
+query concurrency: 1
 pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2
  |- pipeline#1: MockTableScan|table_scan_0 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 18
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 19
 ~result:
+query concurrency: 1
 pipeline#0: MockTableScan|table_scan_0 -> TopN|topn_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 20
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 21
 ~result:
+query concurrency: 1
 pipeline#0: MockTableScan|table_scan_0 -> Limit|limit_1 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_2
 @
 ~test_suite_name: ParallelQuery
 ~result_index: 22
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_3 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_1 -> Limit|limit_2 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
 @
 ~test_suite_name: MultipleQueryBlockWithSource
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> Projection|project_2 -> Projection|project_3 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: MultipleQueryBlockWithSource
 ~result_index: 1
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: MultipleQueryBlockWithSource
 ~result_index: 2
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_4 -> Projection|project_5 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: MultipleQueryBlockWithSource
 ~result_index: 3
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_4 -> Projection|project_5 -> Filter|selection_6 -> Projection|project_7 -> Limit|limit_8 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> Projection|project_1 -> TopN|topn_2 -> Projection|project_3 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: MultipleQueryBlockWithSource
 ~result_index: 4
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|project_1 -> Projection|project_2 -> Projection|project_3 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: MultipleQueryBlockWithSource
 ~result_index: 5
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|project_1 -> Projection|project_2 -> Projection|project_3 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_4
 @
 ~test_suite_name: Window
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> WindowSort|sort_1 -> Window|window_2 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: Window
 ~result_index: 1
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> WindowSort|sort_1 -> Window|window_2 -> Projection|project_3 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: Window
 ~result_index: 2
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> WindowSort|sort_1 -> Projection|project_2 -> Window|window_3 -> Projection|project_4 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: FineGrainedShuffle
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> WindowSort|sort_1 -> Window|window_2 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: FineGrainedShuffle
 ~result_index: 1
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: FineGrainedShuffle
 ~result_index: 2
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> WindowSort|sort_1 -> Window|window_2 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: FineGrainedShuffle
 ~result_index: 3
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> TopN|topn_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: FineGrainedShuffleJoin
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_2 -> Projection|NonTiDBOperator
  |- pipeline#1: MockExchangeReceiver|exchange_receiver_1 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
 @
 ~test_suite_name: FineGrainedShuffleJoin
 ~result_index: 1
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_2 -> Projection|NonTiDBOperator
  |- pipeline#1: MockExchangeReceiver|exchange_receiver_1 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
 @
 ~test_suite_name: FineGrainedShuffleAgg
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Aggregation|aggregation_1 -> Projection|NonTiDBOperator
 @
 ~test_suite_name: FineGrainedShuffleAgg
 ~result_index: 1
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_1 -> Projection|NonTiDBOperator
  |- pipeline#1: MockExchangeReceiver|exchange_receiver_0 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: Join
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_6 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_1 -> Projection|NonTiDBOperator -> JoinProbe|Join_5 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
   |- pipeline#2: MockTableScan|table_scan_2 -> Projection|NonTiDBOperator -> JoinProbe|Join_4 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
@@ -264,6 +312,7 @@ pipeline#0: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator -> JoinProb
 ~test_suite_name: Join
 ~result_index: 1
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_6 -> Projection|NonTiDBOperator
  |- pipeline#1: MockExchangeReceiver|exchange_receiver_1 -> Projection|NonTiDBOperator -> JoinProbe|Join_5 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
   |- pipeline#2: MockExchangeReceiver|exchange_receiver_2 -> Projection|NonTiDBOperator -> JoinProbe|Join_4 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
@@ -272,6 +321,7 @@ pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperat
 ~test_suite_name: Join
 ~result_index: 2
 ~result:
+query concurrency: 10
 pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_6 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_7
  |- pipeline#1: MockExchangeReceiver|exchange_receiver_1 -> Projection|NonTiDBOperator -> JoinProbe|Join_5 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
   |- pipeline#2: MockExchangeReceiver|exchange_receiver_2 -> Projection|NonTiDBOperator -> JoinProbe|Join_4 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
@@ -280,6 +330,7 @@ pipeline#0: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperat
 ~test_suite_name: JoinThenAgg
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_2 -> AggregationBuild|NonTiDBOperator
   |- pipeline#2: MockTableScan|table_scan_1 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
@@ -287,6 +338,7 @@ pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
 ~test_suite_name: JoinThenAgg
 ~result_index: 1
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_2 -> AggregationBuild|NonTiDBOperator
   |- pipeline#2: MockTableScan|table_scan_1 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
@@ -294,6 +346,7 @@ pipeline#0: AggregationConvergent|aggregation_3 -> Projection|NonTiDBOperator
 ~test_suite_name: JoinThenAgg
 ~result_index: 2
 ~result:
+query concurrency: 20
 pipeline#0: AggregationConvergent|aggregation_3 -> Limit|limit_4 -> Projection|NonTiDBOperator -> MockExchangeSender|exchange_sender_5
  |- pipeline#1: MockExchangeReceiver|exchange_receiver_0 -> Projection|NonTiDBOperator -> JoinProbe|Join_2 -> AggregationBuild|NonTiDBOperator
   |- pipeline#2: MockExchangeReceiver|exchange_receiver_1 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator
@@ -301,24 +354,28 @@ pipeline#0: AggregationConvergent|aggregation_3 -> Limit|limit_4 -> Projection|N
 ~test_suite_name: ListBase
 ~result_index: 0
 ~result:
+query concurrency: 1
 pipeline#0: AggregationConvergent|aggregation_2 -> Filter|selection_3 -> Limit|limit_4 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> Filter|selection_1 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ListBase
 ~result_index: 1
 ~result:
+query concurrency: 20
 pipeline#0: AggregationConvergent|aggregation_2 -> Filter|selection_3 -> TopN|topn_4 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_0 -> Filter|selection_1 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: ExpandPlan
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_1 -> Expand|expand_2 -> Projection|NonTiDBOperator -> JoinProbe|Join_5 -> Projection|project_6 -> TopN|topn_7 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_3 -> Projection|project_4 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator |- pipeline#2: MockExchangeReceiver|exchange_receiver_0 -> AggregationBuild|NonTiDBOperator
 @
 ~test_suite_name: Expand2Plan
 ~result_index: 0
 ~result:
+query concurrency: 10
 pipeline#0: AggregationConvergent|aggregation_1 -> Filter|expand_2 -> Projection|NonTiDBOperator -> JoinProbe|Join_5 -> Projection|project_6 -> TopN|topn_7 -> Projection|NonTiDBOperator
  |- pipeline#1: MockTableScan|table_scan_3 -> Projection|project_4 -> Projection|NonTiDBOperator -> JoinBuild|NonTiDBOperator |- pipeline#2: MockExchangeReceiver|exchange_receiver_0 -> AggregationBuild|NonTiDBOperator
 @