From 927b52fdbc891869474839a0f33ccdada20c82e8 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 1 Sep 2022 11:54:25 +0800 Subject: [PATCH] Add UT for list struct based executors (#5734) close pingcap/tiflash#5534 --- dbms/src/Flash/tests/gtest_interpreter.cpp | 45 +++++++++++ .../Flash/tests/gtest_planner_interpreter.cpp | 52 +++++++++++++ dbms/src/TestUtils/executorSerializer.cpp | 74 ++++++++++++------- dbms/src/TestUtils/executorSerializer.h | 3 +- dbms/src/TestUtils/mockExecutor.cpp | 18 ++++- dbms/src/TestUtils/mockExecutor.h | 8 +- .../TestUtils/tests/gtest_mock_executors.cpp | 39 ++++++++++ 7 files changed, 211 insertions(+), 28 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index a053c2743fb..ff0d2f05e34 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -631,5 +631,50 @@ CreatingSets } CATCH +TEST_F(InterpreterExecuteTest, ListBase) +try +{ + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .limit(10) + .build(context, DAGRequestType::list); + String expected = R"( +Limit, limit = 10 + Expression: + Aggregating + Concat + Expression: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .topN("s2", false, 10) + .build(context, DAGRequestType::list); + String expected = R"( +Union: + SharedQuery x 20: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 20: limit = 10 + SharedQuery: + ParallelAggregating, max_threads: 20, final: true + Expression x 20: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 20); + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp index 038af91f725..744126d5f7c 100644 --- a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp @@ -984,5 +984,57 @@ CreatingSets } CATCH +TEST_F(PlannerInterpreterExecuteTest, ListBase) +try +{ + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .filter(eq(col("s2"), lit(Field("1", 1)))) + .limit(10) + .build(context, DAGRequestType::list); + String expected = R"( +Expression: + Limit, limit = 10 + Filter + Expression: + Aggregating + Concat + Expression: + Filter + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .filter(eq(col("s2"), lit(Field("1", 1)))) + .topN("s2", false, 10) + .build(context, DAGRequestType::list); + String expected = R"( +Union: + Expression x 20: + SharedQuery: + MergeSorting, limit = 10 + Union: + PartialSorting x 20: limit = 10 + Expression: + Filter + Expression: + SharedQuery: + ParallelAggregating, max_threads: 20, final: true + Expression x 20: + Filter + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 20); + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/TestUtils/executorSerializer.cpp b/dbms/src/TestUtils/executorSerializer.cpp index a0ae4b11270..5910fde5710 100644 --- a/dbms/src/TestUtils/executorSerializer.cpp +++ b/dbms/src/TestUtils/executorSerializer.cpp @@ -57,29 +57,6 @@ void toString(const Columns & columns, FmtBuffer & buf) } buf.fmtAppend("<{}, {}>", bound, getColumnTypeName(columns.at(bound))); } -} // namespace - -String ExecutorSerializer::serialize(const tipb::DAGRequest * dag_request) -{ - assert((dag_request->executors_size() > 0) != dag_request->has_root_executor()); - if (dag_request->has_root_executor()) - { - serialize(dag_request->root_executor(), 0); - return buf.toString(); - } - else - { - FmtBuffer buffer; - String prefix; - traverseExecutors(dag_request, [this, &prefix](const tipb::Executor & executor) { - assert(executor.has_executor_id()); - buf.fmtAppend("{}{}\n", prefix, executor.executor_id()); - prefix.append(" "); - return true; - }); - return buffer.toString(); - } -} void serializeTableScan(const String & executor_id, const tipb::TableScan & ts, FmtBuffer & buf) { @@ -263,8 +240,55 @@ void serializeSort(const String & executor_id, const tipb::Sort & sort [[maybe_u ", "); buf.append("}\n"); } +} // namespace + +String ExecutorSerializer::serialize(const tipb::DAGRequest * dag_request) +{ + assert((dag_request->executors_size() > 0) != dag_request->has_root_executor()); + if (dag_request->has_root_executor()) + { + serializeTreeStruct(dag_request->root_executor(), 0); + } + else + { + serializeListStruct(dag_request); + } + return buf.toString(); +} + +void ExecutorSerializer::serializeListStruct(const tipb::DAGRequest * dag_request) +{ + String prefix; + traverseExecutors(dag_request, [this, &prefix](const tipb::Executor & executor) { + buf.append(prefix); + switch (executor.tp()) + { + case tipb::ExecType::TypeTableScan: + serializeTableScan("TableScan", executor.tbl_scan(), buf); + break; + case tipb::ExecType::TypeSelection: + serializeSelection("Selection", executor.selection(), buf); + break; + case tipb::ExecType::TypeAggregation: + // stream agg is not supported, treated as normal agg + case tipb::ExecType::TypeStreamAgg: + serializeAggregation("Aggregation", executor.aggregation(), buf); + break; + case tipb::ExecType::TypeTopN: + serializeTopN("TopN", executor.topn(), buf); + break; + case tipb::ExecType::TypeLimit: + serializeLimit("Limit", executor.limit(), buf); + break; + default: + throw TiFlashException("Should not reach here", Errors::Coprocessor::Internal); + } + prefix.append(" "); + return true; + }); +} -void ExecutorSerializer::serialize(const tipb::Executor & root_executor, size_t level) +void ExecutorSerializer::serializeTreeStruct(const tipb::Executor & root_executor, size_t level) { auto append_str = [&level, this](const tipb::Executor & executor) { assert(executor.has_executor_id()); @@ -324,7 +348,7 @@ void ExecutorSerializer::serialize(const tipb::Executor & root_executor, size_t if (executor.has_join()) { for (const auto & child : executor.join().children()) - serialize(child, level); + serializeTreeStruct(child, level); return false; } return true; diff --git a/dbms/src/TestUtils/executorSerializer.h b/dbms/src/TestUtils/executorSerializer.h index 048c0564250..e2b8c77be88 100644 --- a/dbms/src/TestUtils/executorSerializer.h +++ b/dbms/src/TestUtils/executorSerializer.h @@ -28,7 +28,8 @@ class ExecutorSerializer String serialize(const tipb::DAGRequest * dag_request); private: - void serialize(const tipb::Executor & root_executor, size_t level); + void serializeListStruct(const tipb::DAGRequest * dag_request); + void serializeTreeStruct(const tipb::Executor & root_executor, size_t level); void addPrefix(size_t level) { buf.append(String(level, ' ')); } private: diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index 960c686ae8b..4ed72917436 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -84,8 +85,9 @@ void DAGRequestBuilder::initDAGRequest(tipb::DAGRequest & dag_request) } // traval the AST tree to build tipb::Executor recursively. -std::shared_ptr DAGRequestBuilder::build(MockDAGRequestContext & mock_context) +std::shared_ptr DAGRequestBuilder::build(MockDAGRequestContext & mock_context, DAGRequestType type) { + // build tree struct base executor MPPInfo mpp_info(properties.start_ts, -1, -1, {}, mock_context.receiver_source_task_ids_map); std::shared_ptr dag_request_ptr = std::make_shared(); tipb::DAGRequest & dag_request = *dag_request_ptr; @@ -93,6 +95,20 @@ std::shared_ptr DAGRequestBuilder::build(MockDAGRequestContext root->toTiPBExecutor(dag_request.mutable_root_executor(), properties.collator, mpp_info, mock_context.context); root.reset(); executor_index = 0; + + // convert to list struct base executor + if (type == DAGRequestType::list) + { + auto & mutable_executors = *dag_request_ptr->mutable_executors(); + traverseExecutorsReverse(dag_request_ptr.get(), [&](const tipb::Executor & executor) -> bool { + auto * mutable_executor = mutable_executors.Add(); + (*mutable_executor) = executor; + mutable_executor->clear_executor_id(); + return true; + }); + dag_request_ptr->release_root_executor(); + } + return dag_request_ptr; } diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 45e56e63264..0b6850669fb 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -43,6 +43,12 @@ inline int32_t convertToTiDBCollation(int32_t collation) return -(abs(collation)); } +enum class DAGRequestType +{ + tree, + list, +}; + /** Responsible for Hand write tipb::DAGRequest * Use this class to mock DAGRequest, then feed the DAGRequest into * the Interpreter for test purpose. @@ -70,7 +76,7 @@ class DAGRequestBuilder return root; } - std::shared_ptr build(MockDAGRequestContext & mock_context); + std::shared_ptr build(MockDAGRequestContext & mock_context, DAGRequestType type = DAGRequestType::tree); QueryTasks buildMPPTasks(MockDAGRequestContext & mock_context); QueryTasks buildMPPTasks(MockDAGRequestContext & mock_context, const DAGProperties & properties); diff --git a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp index ad9c7790f9e..5a58c8fb2f4 100644 --- a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp +++ b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp @@ -340,5 +340,44 @@ try } } CATCH + +TEST_F(MockDAGRequestTest, ListBase) +try +{ + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .filter(eq(col("s2"), lit(Field("1", 1)))) + .limit(10) + .build(context, DAGRequestType::list); + String expected = R"( +Limit | 10 + Selection | equals(<1, String>, <-5692549928996306944, String>)} + Aggregation | group_by: {<1, String>}, agg_func: {max(<0, String>)} + Selection | equals(<0, String>, <1, String>)} + TableScan | {<0, String>, <1, String>})"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + } + + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .filter(eq(col("s2"), lit(Field("1", 1)))) + .topN("s2", false, 10) + .build(context, DAGRequestType::list); + String expected = R"( +TopN | order_by: {(<1, String>, desc: false)}, limit: 10 + Selection | equals(<1, String>, <-5692549928996306944, String>)} + Aggregation | group_by: {<1, String>}, agg_func: {max(<0, String>)} + Selection | equals(<0, String>, <1, String>)} + TableScan | {<0, String>, <1, String>})"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + } +} +CATCH } // namespace tests } // namespace DB