Skip to content

Commit

Permalink
Pipeline: support partition table scan without optimize (#7474)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
SeaRise authored May 17, 2023
1 parent d3cac13 commit d4070a3
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 13 deletions.
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,13 +1064,16 @@ SourceOps DAGStorageInterpreter::buildLocalSourceOps(
return {};
const auto table_query_infos = generateSelectQueryInfos();

/// TODO: support multiple partitions
// TODO Improve the performance of partition table in extreme case.
// ref https://github.com/pingcap/tiflash/issues/4474
SourceOps source_ops;
for (const auto & table_query_info : table_query_infos)
{
const TableID table_id = table_query_info.first;
const SelectQueryInfo & query_info = table_query_info.second;
source_ops = buildLocalSourceOpsForPhysicalTable(exec_status, table_id, query_info, max_block_size);

auto table_source_ops = buildLocalSourceOpsForPhysicalTable(exec_status, table_id, query_info, max_block_size);
source_ops.insert(source_ops.end(), std::make_move_iterator(table_source_ops.begin()), std::make_move_iterator(table_source_ops.end()));
}

LOG_DEBUG(
Expand Down
18 changes: 14 additions & 4 deletions dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <Interpreters/Settings.h>
#include <tipb/select.pb.h>

#include <magic_enum.hpp>

namespace DB
{
namespace
Expand Down Expand Up @@ -251,6 +253,7 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request, const Settings
switch (executor.tp())
{
case tipb::ExecType::TypeTableScan:
case tipb::ExecType::TypePartitionTableScan:
case tipb::ExecType::TypeProjection:
case tipb::ExecType::TypeSelection:
case tipb::ExecType::TypeLimit:
Expand All @@ -259,24 +262,31 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request, const Settings
case tipb::ExecType::TypeExchangeReceiver:
case tipb::ExecType::TypeExpand:
case tipb::ExecType::TypeAggregation:
case tipb::ExecType::TypeStreamAgg:
return true;
case tipb::ExecType::TypeWindow:
case tipb::ExecType::TypeSort:
// TODO support non fine grained shuffle.
is_supported = FineGrainedShuffle(&executor).enable();
if (settings.enforce_enable_pipeline)
throw Exception("Pipeline mode does not support non-fine-grained window function, and an error is reported because the setting enforce_enable_pipeline is true.");
return is_supported;
case tipb::ExecType::TypeJoin:
// TODO support spill.
// If force_enable_pipeline is true, it will return true, even if the join does not actually support spill.
is_supported = (settings.max_bytes_before_external_join == 0 || settings.force_enable_pipeline);
// If enforce_enable_pipeline is true, it will return true, even if the join does not actually support spill.
is_supported = (settings.max_bytes_before_external_join == 0 || settings.enforce_enable_pipeline);
return is_supported;
default:
if (settings.enforce_enable_pipeline)
throw Exception(fmt::format(
"Pipeline mode does not support {}, and an error is reported because the setting enforce_enable_pipeline is true.",
magic_enum::enum_name(executor.tp())));
is_supported = false;
return false;
}
});
if (settings.force_enable_pipeline && !is_supported)
throw Exception("There is an unsupported operator, and an error is reported because the setting force_enable_pipeline is true.");
if (settings.enforce_enable_pipeline && !is_supported)
throw Exception("There is an unsupported operator in pipeline model, and an error is reported because the setting enforce_enable_pipeline is true.");
return is_supported;
}
} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ PhysicalPlanNodePtr PhysicalJoin::build(

const Settings & settings = context.getSettingsRef();
size_t max_bytes_before_external_join = settings.max_bytes_before_external_join;
if (settings.force_enable_pipeline && max_bytes_before_external_join > 0)
if (settings.enforce_enable_pipeline && max_bytes_before_external_join > 0)
{
// Currently, the pipeline model does not support disk-based join, so when force_enable_pipeline is true, the disk-based join will be disabled.
// Currently, the pipeline model does not support disk-based join, so when enforce_enable_pipeline is true, the disk-based join will be disabled.
max_bytes_before_external_join = 0;
LOG_WARNING(log, "Pipeline model does not support disk-based join, so set max_bytes_before_external_join = 0");
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ QueryExecutorPtr executeAsBlockIO(Context & context, bool internal)

QueryExecutorPtr queryExecute(Context & context, bool internal)
{
if (context.getSettingsRef().force_enable_pipeline)
if (context.getSettingsRef().enforce_enable_pipeline)
{
RUNTIME_CHECK_MSG(
context.getSharedContextDisagg()->notDisaggregatedMode(),
"The pipeline model does not support storage-computing separation mode, and an error is reported because the setting force_enable_pipeline is true.");
"The pipeline model does not support storage-computing separation mode, and an error is reported because the setting enforce_enable_pipeline is true.");
auto res = executeAsPipeline(context, internal);
RUNTIME_CHECK_MSG(res, "Failed to execute query using pipeline model, and an error is reported because the setting force_enable_pipeline is true.");
RUNTIME_CHECK_MSG(res, "Failed to execute query using pipeline model, and an error is reported because the setting enforce_enable_pipeline is true.");
return std::move(*res);
}
if (context.getSettingsRef().enable_planner
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ struct Settings
M(SettingUInt64, max_spilled_bytes_per_file, 0, "Max spilled data bytes per spill file, 0 as the default value, 0 means no limit.") \
M(SettingBool, enable_planner, true, "Enable planner") \
M(SettingBool, enable_pipeline, false, "Enable pipeline model") \
M(SettingBool, force_enable_pipeline, false, "Force the enablement of the pipeline model") \
M(SettingBool, enforce_enable_pipeline, false, "Enforce the enablement of the pipeline model") \
M(SettingUInt64, pipeline_cpu_task_thread_pool_size, 0, "The size of cpu task thread pool. 0 means using number_of_logical_cpu_cores.") \
M(SettingUInt64, pipeline_io_task_thread_pool_size, 0, "The size of io task thread pool. 0 means using number_of_logical_cpu_cores.") \
M(SettingTaskQueueType, pipeline_cpu_task_thread_pool_queue_type, TaskQueueType::DEFAULT, "The task queue of cpu task thread pool") \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
});

// For test mode, TaskScheduler is controlled by test case.
bool enable_pipeline = (settings.enable_pipeline || settings.force_enable_pipeline) && !global_context->isTest();
bool enable_pipeline = (settings.enable_pipeline || settings.enforce_enable_pipeline) && !global_context->isTest();
if (enable_pipeline)
{
auto get_pool_size = [](const auto & setting) {
Expand Down

0 comments on commit d4070a3

Please sign in to comment.