From 5bd0e891eef0dcaecf35198b3051f71c20939c0a Mon Sep 17 00:00:00 2001 From: yanweiqi <592838129@qq.com> Date: Wed, 28 Dec 2022 18:12:16 +0800 Subject: [PATCH] Test: Refine fine-grained shuffle ut (#6557) ref pingcap/tiflash#4609 --- .../Debug/MockExecutor/AggregationBinder.cpp | 4 +- .../MockExecutor/ExchangeSenderBinder.cpp | 2 + .../Debug/MockExecutor/ExchangeSenderBinder.h | 4 +- dbms/src/Debug/MockExecutor/JoinBinder.cpp | 8 +-- .../tests/gtest_aggregation_executor.cpp | 19 ++++--- dbms/src/Flash/tests/gtest_compute_server.cpp | 52 ------------------- dbms/src/Flash/tests/gtest_limit_executor.cpp | 6 +-- .../Flash/tests/gtest_projection_executor.cpp | 12 ++--- dbms/src/Flash/tests/gtest_topn_executor.cpp | 11 ++-- 9 files changed, 35 insertions(+), 83 deletions(-) diff --git a/dbms/src/Debug/MockExecutor/AggregationBinder.cpp b/dbms/src/Debug/MockExecutor/AggregationBinder.cpp index e95346af901..9817a0c8cf7 100644 --- a/dbms/src/Debug/MockExecutor/AggregationBinder.cpp +++ b/dbms/src/Debug/MockExecutor/AggregationBinder.cpp @@ -92,11 +92,11 @@ void AggregationBinder::toMPPSubPlan(size_t & executor_index, const DAGPropertie } std::shared_ptr exchange_sender - = std::make_shared(executor_index, output_schema_for_partial_agg, partition_keys.empty() ? tipb::PassThrough : tipb::Hash, partition_keys); + = std::make_shared(executor_index, output_schema_for_partial_agg, partition_keys.empty() ? tipb::PassThrough : tipb::Hash, partition_keys, fine_grained_shuffle_stream_count); exchange_sender->children.push_back(partial_agg); std::shared_ptr exchange_receiver - = std::make_shared(executor_index, output_schema_for_partial_agg); + = std::make_shared(executor_index, output_schema_for_partial_agg, fine_grained_shuffle_stream_count); exchange_map[exchange_receiver->name] = std::make_pair(exchange_receiver, exchange_sender); /// re-construct agg_exprs and gby_exprs in final_agg diff --git a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp index 45abb7de9fa..88bfc19c4fb 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp +++ b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp @@ -28,6 +28,8 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_ tipb_executor->set_executor_id(name); tipb::ExchangeSender * exchange_sender = tipb_executor->mutable_exchange_sender(); exchange_sender->set_tp(type); + if (tipb_executor->exchange_sender().tp() == tipb::Hash) + tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count); for (auto i : partition_keys) { auto * expr = exchange_sender->add_partition_keys(); diff --git a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h index ed6710ac22e..c03899351eb 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h +++ b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h @@ -22,10 +22,11 @@ namespace DB::mock class ExchangeSenderBinder : public ExecutorBinder { public: - ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector & partition_keys_ = {}) + ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector & partition_keys_ = {}, uint64_t fine_grained_shuffle_stream_count_ = 0) : ExecutorBinder(index, "exchange_sender_" + std::to_string(index), output) , type(type_) , partition_keys(partition_keys_) + , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) {} bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; @@ -38,6 +39,7 @@ class ExchangeSenderBinder : public ExecutorBinder tipb::ExchangeType type; TaskMetas task_metas; std::vector partition_keys; + uint64_t fine_grained_shuffle_stream_count; }; ExecutorBinderPtr compileExchangeSender(ExecutorBinderPtr input, size_t & executor_index, tipb::ExchangeType exchange_type); diff --git a/dbms/src/Debug/MockExecutor/JoinBinder.cpp b/dbms/src/Debug/MockExecutor/JoinBinder.cpp index e9bc36bc5d0..ea704084554 100644 --- a/dbms/src/Debug/MockExecutor/JoinBinder.cpp +++ b/dbms/src/Debug/MockExecutor/JoinBinder.cpp @@ -224,16 +224,16 @@ void JoinBinder::toMPPSubPlan(size_t & executor_index, const DAGProperties & pro } std::shared_ptr left_exchange_sender - = std::make_shared(executor_index, children[0]->output_schema, tipb::Hash, left_partition_keys); + = std::make_shared(executor_index, children[0]->output_schema, tipb::Hash, left_partition_keys, fine_grained_shuffle_stream_count); left_exchange_sender->children.push_back(children[0]); std::shared_ptr right_exchange_sender - = std::make_shared(executor_index, children[1]->output_schema, tipb::Hash, right_partition_keys); + = std::make_shared(executor_index, children[1]->output_schema, tipb::Hash, right_partition_keys, fine_grained_shuffle_stream_count); right_exchange_sender->children.push_back(children[1]); std::shared_ptr left_exchange_receiver - = std::make_shared(executor_index, children[0]->output_schema); + = std::make_shared(executor_index, children[0]->output_schema, fine_grained_shuffle_stream_count); std::shared_ptr right_exchange_receiver - = std::make_shared(executor_index, children[1]->output_schema); + = std::make_shared(executor_index, children[1]->output_schema, fine_grained_shuffle_stream_count); children[0] = left_exchange_receiver; children[1] = right_exchange_receiver; diff --git a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp index b95f847c42a..e5bd3c692ef 100644 --- a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp +++ b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp @@ -30,7 +30,7 @@ namespace tests types_col_name[a], types_col_name[b] \ } -class ExecutorAggTestRunner : public ExecutorTest +class AggExecutorTestRunner : public ExecutorTest { public: using ColStringNullableType = std::optional::FieldType>; @@ -61,7 +61,7 @@ class ExecutorAggTestRunner : public ExecutorTest using ColumnWithFloat64 = std::vector; using ColumnWithString = std::vector; - ~ExecutorAggTestRunner() override = default; + ~AggExecutorTestRunner() override = default; void initializeContext() override { @@ -225,7 +225,7 @@ class ExecutorAggTestRunner : public ExecutorTest }; /// Guarantee the correctness of group by -TEST_F(ExecutorAggTestRunner, GroupBy) +TEST_F(AggExecutorTestRunner, GroupBy) try { std::shared_ptr request; @@ -296,7 +296,7 @@ try } CATCH -TEST_F(ExecutorAggTestRunner, AggregationMaxAndMin) +TEST_F(AggExecutorTestRunner, AggregationMaxAndMin) try { std::shared_ptr request; @@ -344,7 +344,7 @@ try } CATCH -TEST_F(ExecutorAggTestRunner, AggregationCount) +TEST_F(AggExecutorTestRunner, AggregationCount) try { /// Prepare some data @@ -388,8 +388,7 @@ CATCH // TODO support more type of min, max, count. // support more aggregation functions: sum, forst_row, group_concat - -TEST_F(ExecutorAggTestRunner, AggregationCountGroupByFastPathMultiKeys) +TEST_F(AggExecutorTestRunner, AggregationCountGroupByFastPathMultiKeys) try { /// Prepare some data @@ -492,7 +491,7 @@ try } CATCH -TEST_F(ExecutorAggTestRunner, AggNull) +TEST_F(AggExecutorTestRunner, AggNull) try { auto request = context @@ -509,7 +508,7 @@ try } CATCH -TEST_F(ExecutorAggTestRunner, RepeatedAggregateFunction) +TEST_F(AggExecutorTestRunner, RepeatedAggregateFunction) try { std::vector functions = {Max(col("s1")), Min(col("s1")), Sum(col("s2"))}; @@ -557,7 +556,7 @@ try } CATCH -TEST_F(ExecutorAggTestRunner, AggMerge) +TEST_F(AggExecutorTestRunner, AggMerge) try { std::vector tables{"big_table_1", "big_table_2", "big_table_3"}; diff --git a/dbms/src/Flash/tests/gtest_compute_server.cpp b/dbms/src/Flash/tests/gtest_compute_server.cpp index ab53fe00392..8a81a5650dd 100644 --- a/dbms/src/Flash/tests/gtest_compute_server.cpp +++ b/dbms/src/Flash/tests/gtest_compute_server.cpp @@ -502,49 +502,6 @@ try } CATCH -/// For FineGrainedShuffleJoin/Agg test usage, update internal exchange senders/receivers flag -/// Allow select,agg,join,tableScan,exchangeSender,exchangeReceiver,projection executors only -void setFineGrainedShuffleForExchange(tipb::Executor & root) -{ - tipb::Executor * current = &root; - while (current) - { - switch (current->tp()) - { - case tipb::ExecType::TypeSelection: - current = const_cast(¤t->selection().child()); - break; - case tipb::ExecType::TypeAggregation: - current = const_cast(¤t->aggregation().child()); - break; - case tipb::ExecType::TypeProjection: - current = const_cast(¤t->projection().child()); - break; - case tipb::ExecType::TypeJoin: - { - /// update build side path - JoinInterpreterHelper::TiFlashJoin tiflash_join{current->join()}; - current = const_cast(¤t->join().children()[tiflash_join.build_side_index]); - break; - } - case tipb::ExecType::TypeExchangeSender: - if (current->exchange_sender().tp() == tipb::Hash) - current->set_fine_grained_shuffle_stream_count(8); - current = const_cast(¤t->exchange_sender().child()); - break; - case tipb::ExecType::TypeExchangeReceiver: - current->set_fine_grained_shuffle_stream_count(8); - current = nullptr; - break; - case tipb::ExecType::TypeTableScan: - current = nullptr; - break; - default: - throw TiFlashException("Should not reach here", Errors::Coprocessor::Internal); - } - } -} - TEST_F(ComputeServerRunner, runFineGrainedShuffleJoinTest) try { @@ -578,10 +535,6 @@ try .join(context.scan("test_db", "r_table_2"), join_type, {col("s1"), col("s2")}, enable) .project({col("l_table_2.s1"), col("l_table_2.s2"), col("l_table_2.s3")}); auto tasks = request2.buildMPPTasks(context, properties); - for (auto & task : tasks) - { - setFineGrainedShuffleForExchange(const_cast(task.dag_request->root_executor())); - } const auto actual_cols = executeMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap()); ASSERT_COLUMNS_EQ_UR(expected_cols, actual_cols); } @@ -606,11 +559,6 @@ try .scan("test_db", "test_table_2") .aggregation({Max(col("s3"))}, {col("s1"), col("s2")}, enable); auto tasks = request2.buildMPPTasks(context, properties); - for (auto & task : tasks) - { - setFineGrainedShuffleForExchange(const_cast(task.dag_request->root_executor())); - } - const auto actual_cols = executeMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap()); ASSERT_COLUMNS_EQ_UR(expected_cols, actual_cols); } diff --git a/dbms/src/Flash/tests/gtest_limit_executor.cpp b/dbms/src/Flash/tests/gtest_limit_executor.cpp index 140d58f1a2c..049efafc333 100644 --- a/dbms/src/Flash/tests/gtest_limit_executor.cpp +++ b/dbms/src/Flash/tests/gtest_limit_executor.cpp @@ -20,7 +20,7 @@ namespace DB namespace tests { -class ExecutorLimitTestRunner : public DB::tests::ExecutorTest +class LimitExecutorTestRunner : public DB::tests::ExecutorTest { public: using ColDataType = std::optional::FieldType>; @@ -47,7 +47,7 @@ class ExecutorLimitTestRunner : public DB::tests::ExecutorTest const ColumnWithData col0{"col0-0", {}, "col0-2", "col0-3", {}, "col0-5", "col0-6", "col0-7"}; }; -TEST_F(ExecutorLimitTestRunner, Limit) +TEST_F(LimitExecutorTestRunner, Limit) try { std::shared_ptr request; @@ -77,7 +77,7 @@ try } CATCH -TEST_F(ExecutorLimitTestRunner, RawQuery) +TEST_F(LimitExecutorTestRunner, RawQuery) try { String query = "select * from test_db.projection_test_table limit 1"; diff --git a/dbms/src/Flash/tests/gtest_projection_executor.cpp b/dbms/src/Flash/tests/gtest_projection_executor.cpp index 599044f5503..25a234ee385 100644 --- a/dbms/src/Flash/tests/gtest_projection_executor.cpp +++ b/dbms/src/Flash/tests/gtest_projection_executor.cpp @@ -20,7 +20,7 @@ namespace DB namespace tests { -class ExecutorProjectionTestRunner : public DB::tests::ExecutorTest +class ProjectionExecutorTestRunner : public DB::tests::ExecutorTest { public: using ColDataString = std::vector::FieldType>>; @@ -94,7 +94,7 @@ class ExecutorProjectionTestRunner : public DB::tests::ExecutorTest const String table_name{"projection_test_table"}; }; -TEST_F(ExecutorProjectionTestRunner, Projection) +TEST_F(ProjectionExecutorTestRunner, Projection) try { /// Check single column @@ -141,7 +141,7 @@ try } CATCH -TEST_F(ExecutorProjectionTestRunner, ProjectionFunction) +TEST_F(ProjectionExecutorTestRunner, ProjectionFunction) try { std::shared_ptr request; @@ -231,7 +231,7 @@ try } CATCH -TEST_F(ExecutorProjectionTestRunner, MultiFunction) +TEST_F(ProjectionExecutorTestRunner, MultiFunction) try { MockAstVec functions = { @@ -308,7 +308,7 @@ try } CATCH -TEST_F(ExecutorProjectionTestRunner, MultiProjection) +TEST_F(ProjectionExecutorTestRunner, MultiProjection) try { auto req = context @@ -375,7 +375,7 @@ try } CATCH -TEST_F(ExecutorProjectionTestRunner, ProjectionThenAgg) +TEST_F(ProjectionExecutorTestRunner, ProjectionThenAgg) try { auto req = context diff --git a/dbms/src/Flash/tests/gtest_topn_executor.cpp b/dbms/src/Flash/tests/gtest_topn_executor.cpp index 407676c8a3e..3cd848643ba 100644 --- a/dbms/src/Flash/tests/gtest_topn_executor.cpp +++ b/dbms/src/Flash/tests/gtest_topn_executor.cpp @@ -19,7 +19,8 @@ namespace DB { namespace tests { -class ExecutorTopNTestRunner : public DB::tests::ExecutorTest + +class TopNExecutorTestRunner : public DB::tests::ExecutorTest { public: using ColStringType = std::optional::FieldType>; @@ -85,7 +86,7 @@ class ExecutorTopNTestRunner : public DB::tests::ExecutorTest std::shared_ptr buildDAGRequest(const String & table_name, MockOrderByItemVec order_by_items, int limit, MockAstVec func_proj_ast = {}, MockColumnNameVec out_proj_ast = {}) { - if (func_proj_ast.size() == 0) + if (func_proj_ast.empty()) return context.scan(db_name, table_name).topN(order_by_items, limit).build(context); else return context.scan(db_name, table_name).project(func_proj_ast).topN(order_by_items, limit).project(out_proj_ast).build(context); @@ -106,7 +107,7 @@ class ExecutorTopNTestRunner : public DB::tests::ExecutorTest ColumnWithInt32 col_salary{1300, 0, {}, 900, {}, -300}; }; -TEST_F(ExecutorTopNTestRunner, TopN) +TEST_F(TopNExecutorTestRunner, TopN) try { std::shared_ptr request; @@ -173,7 +174,7 @@ try } CATCH -TEST_F(ExecutorTopNTestRunner, TopNFunction) +TEST_F(TopNExecutorTestRunner, TopNFunction) try { std::shared_ptr request; @@ -245,7 +246,7 @@ try } CATCH -TEST_F(ExecutorTopNTestRunner, BigTable) +TEST_F(TopNExecutorTestRunner, BigTable) try { std::vector tables{"big_table_1", "big_table_2", "big_table_3"};