diff --git a/dbms/src/Flash/tests/gtest_compute_server.cpp b/dbms/src/Flash/tests/gtest_compute_server.cpp index 24c5496ab50..022afcda605 100644 --- a/dbms/src/Flash/tests/gtest_compute_server.cpp +++ b/dbms/src/Flash/tests/gtest_compute_server.cpp @@ -147,11 +147,12 @@ class ComputeServerRunner : public DB::tests::MPPTaskTestUtils BlockInputStreamPtr stream; try { - auto tasks = prepareMPPTasks( - context.scan("test_db", "l_table") + std::function gen_builder = [&]() { + return context.scan("test_db", "l_table") .aggregation({Max(col("l_table.s"))}, {col("l_table.s")}) - .project({col("max(l_table.s)"), col("l_table.s")}), - properties); + .project({col("max(l_table.s)"), col("l_table.s")}); + }; + QueryTasks tasks = prepareMPPTasks(gen_builder, properties); executeProblematicMPPTasks(tasks, properties, stream); } catch (...) diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.cpp b/dbms/src/TestUtils/MPPTaskTestUtils.cpp index adf51f5f5d7..d718a4d37cb 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.cpp +++ b/dbms/src/TestUtils/MPPTaskTestUtils.cpp @@ -83,6 +83,7 @@ void MPPTaskTestUtils::startServers(size_t server_num_) } MockComputeServerManager::instance().startServers(log_ptr, test_meta.context_idx); + MockComputeServerManager::instance().setMockStorage(context.mockStorage()); MockServerAddrGenerator::instance().reset(); } @@ -112,7 +113,17 @@ std::vector MPPTaskTestUtils::prepareMPPTasks(DAGRequestBuilder build auto tasks = builder.buildMPPTasks(context, properties); for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i) TiFlashTestEnv::getGlobalContext(i).setCancelTest(); - MockComputeServerManager::instance().setMockStorage(context.mockStorage()); + return tasks; +} + +std::vector MPPTaskTestUtils::prepareMPPTasks( + std::function & gen_builder, + const DAGProperties & properties) +{ + std::lock_guard lock(mu); + auto tasks = gen_builder().buildMPPTasks(context, properties); + for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i) + TiFlashTestEnv::getGlobalContext(i).setCancelTest(); return tasks; } diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.h b/dbms/src/TestUtils/MPPTaskTestUtils.h index 3023cf24b85..625a326baf2 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.h +++ b/dbms/src/TestUtils/MPPTaskTestUtils.h @@ -78,8 +78,14 @@ class MPPTaskTestUtils : public ExecutorTest // run mpp tasks which are ready to cancel, the return value is the start_ts of query. BlockInputStreamPtr prepareMPPStreams(DAGRequestBuilder builder, const DAGProperties & properties); + // prepareMPPTasks is not thread safe, the builder's executor_index(which is ref to context's index) is updated during this process std::vector prepareMPPTasks(DAGRequestBuilder builder, const DAGProperties & properties); + // prepareMPPTasks is thread safe + std::vector prepareMPPTasks( + std::function & gen_builder, + const DAGProperties & properties); + static void setCancelTest(); /// Keep stream not deconstructed until cancelGather invoked outside, so that the deconstruction progress won't block