From d4070a335c5bff1bf314d30829a0843a6171daee Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 17 May 2023 17:17:35 +0800 Subject: [PATCH] Pipeline: support partition table scan without optimize (#7474) ref pingcap/tiflash#6518 --- .../Coprocessor/DAGStorageInterpreter.cpp | 7 +++++-- dbms/src/Flash/Pipeline/Pipeline.cpp | 18 ++++++++++++++---- dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp | 4 ++-- dbms/src/Flash/executeQuery.cpp | 6 +++--- dbms/src/Interpreters/Settings.h | 2 +- dbms/src/Server/Server.cpp | 2 +- 6 files changed, 26 insertions(+), 13 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 052af71b8bc..66364495974 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -1064,13 +1064,16 @@ SourceOps DAGStorageInterpreter::buildLocalSourceOps( return {}; const auto table_query_infos = generateSelectQueryInfos(); - /// TODO: support multiple partitions + // TODO Improve the performance of partition table in extreme case. + // ref https://github.com/pingcap/tiflash/issues/4474 SourceOps source_ops; for (const auto & table_query_info : table_query_infos) { const TableID table_id = table_query_info.first; const SelectQueryInfo & query_info = table_query_info.second; - source_ops = buildLocalSourceOpsForPhysicalTable(exec_status, table_id, query_info, max_block_size); + + auto table_source_ops = buildLocalSourceOpsForPhysicalTable(exec_status, table_id, query_info, max_block_size); + source_ops.insert(source_ops.end(), std::make_move_iterator(table_source_ops.begin()), std::make_move_iterator(table_source_ops.end())); } LOG_DEBUG( diff --git a/dbms/src/Flash/Pipeline/Pipeline.cpp b/dbms/src/Flash/Pipeline/Pipeline.cpp index 8cd3eda691f..e842e844971 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.cpp +++ b/dbms/src/Flash/Pipeline/Pipeline.cpp @@ -26,6 +26,8 @@ #include #include +#include + namespace DB { namespace @@ -251,6 +253,7 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request, const Settings switch (executor.tp()) { case tipb::ExecType::TypeTableScan: + case tipb::ExecType::TypePartitionTableScan: case tipb::ExecType::TypeProjection: case tipb::ExecType::TypeSelection: case tipb::ExecType::TypeLimit: @@ -259,24 +262,31 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request, const Settings case tipb::ExecType::TypeExchangeReceiver: case tipb::ExecType::TypeExpand: case tipb::ExecType::TypeAggregation: + case tipb::ExecType::TypeStreamAgg: return true; case tipb::ExecType::TypeWindow: case tipb::ExecType::TypeSort: // TODO support non fine grained shuffle. is_supported = FineGrainedShuffle(&executor).enable(); + if (settings.enforce_enable_pipeline) + throw Exception("Pipeline mode does not support non-fine-grained window function, and an error is reported because the setting enforce_enable_pipeline is true."); return is_supported; case tipb::ExecType::TypeJoin: // TODO support spill. - // If force_enable_pipeline is true, it will return true, even if the join does not actually support spill. - is_supported = (settings.max_bytes_before_external_join == 0 || settings.force_enable_pipeline); + // If enforce_enable_pipeline is true, it will return true, even if the join does not actually support spill. + is_supported = (settings.max_bytes_before_external_join == 0 || settings.enforce_enable_pipeline); return is_supported; default: + if (settings.enforce_enable_pipeline) + throw Exception(fmt::format( + "Pipeline mode does not support {}, and an error is reported because the setting enforce_enable_pipeline is true.", + magic_enum::enum_name(executor.tp()))); is_supported = false; return false; } }); - if (settings.force_enable_pipeline && !is_supported) - throw Exception("There is an unsupported operator, and an error is reported because the setting force_enable_pipeline is true."); + if (settings.enforce_enable_pipeline && !is_supported) + throw Exception("There is an unsupported operator in pipeline model, and an error is reported because the setting enforce_enable_pipeline is true."); return is_supported; } } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp index 190da07a764..f6a0c102a92 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp @@ -122,9 +122,9 @@ PhysicalPlanNodePtr PhysicalJoin::build( const Settings & settings = context.getSettingsRef(); size_t max_bytes_before_external_join = settings.max_bytes_before_external_join; - if (settings.force_enable_pipeline && max_bytes_before_external_join > 0) + if (settings.enforce_enable_pipeline && max_bytes_before_external_join > 0) { - // Currently, the pipeline model does not support disk-based join, so when force_enable_pipeline is true, the disk-based join will be disabled. + // Currently, the pipeline model does not support disk-based join, so when enforce_enable_pipeline is true, the disk-based join will be disabled. max_bytes_before_external_join = 0; LOG_WARNING(log, "Pipeline model does not support disk-based join, so set max_bytes_before_external_join = 0"); } diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp index 6b8d1723292..f5a18277f64 100644 --- a/dbms/src/Flash/executeQuery.cpp +++ b/dbms/src/Flash/executeQuery.cpp @@ -162,13 +162,13 @@ QueryExecutorPtr executeAsBlockIO(Context & context, bool internal) QueryExecutorPtr queryExecute(Context & context, bool internal) { - if (context.getSettingsRef().force_enable_pipeline) + if (context.getSettingsRef().enforce_enable_pipeline) { RUNTIME_CHECK_MSG( context.getSharedContextDisagg()->notDisaggregatedMode(), - "The pipeline model does not support storage-computing separation mode, and an error is reported because the setting force_enable_pipeline is true."); + "The pipeline model does not support storage-computing separation mode, and an error is reported because the setting enforce_enable_pipeline is true."); auto res = executeAsPipeline(context, internal); - RUNTIME_CHECK_MSG(res, "Failed to execute query using pipeline model, and an error is reported because the setting force_enable_pipeline is true."); + RUNTIME_CHECK_MSG(res, "Failed to execute query using pipeline model, and an error is reported because the setting enforce_enable_pipeline is true."); return std::move(*res); } if (context.getSettingsRef().enable_planner diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index a59227335c1..008a29978ab 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -290,7 +290,7 @@ struct Settings M(SettingUInt64, max_spilled_bytes_per_file, 0, "Max spilled data bytes per spill file, 0 as the default value, 0 means no limit.") \ M(SettingBool, enable_planner, true, "Enable planner") \ M(SettingBool, enable_pipeline, false, "Enable pipeline model") \ - M(SettingBool, force_enable_pipeline, false, "Force the enablement of the pipeline model") \ + M(SettingBool, enforce_enable_pipeline, false, "Enforce the enablement of the pipeline model") \ M(SettingUInt64, pipeline_cpu_task_thread_pool_size, 0, "The size of cpu task thread pool. 0 means using number_of_logical_cpu_cores.") \ M(SettingUInt64, pipeline_io_task_thread_pool_size, 0, "The size of io task thread pool. 0 means using number_of_logical_cpu_cores.") \ M(SettingTaskQueueType, pipeline_cpu_task_thread_pool_queue_type, TaskQueueType::DEFAULT, "The task queue of cpu task thread pool") \ diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 772690be399..09d83e2172d 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1506,7 +1506,7 @@ int Server::main(const std::vector & /*args*/) }); // For test mode, TaskScheduler is controlled by test case. - bool enable_pipeline = (settings.enable_pipeline || settings.force_enable_pipeline) && !global_context->isTest(); + bool enable_pipeline = (settings.enable_pipeline || settings.enforce_enable_pipeline) && !global_context->isTest(); if (enable_pipeline) { auto get_pool_size = [](const auto & setting) {