Skip to content

Commit

Permalink
refactor DAGQueryBlockInterpreter (#3639)
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise authored Dec 14, 2021
1 parent 4b310e0 commit b2d3ba1
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 83 deletions.
192 changes: 109 additions & 83 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,105 @@ void DAGQueryBlockInterpreter::executeRemoteQueryImpl(
}
}

void DAGQueryBlockInterpreter::executeExchangeReceiver(DAGPipeline & pipeline)
{
auto it = exchange_receiver_map.find(query_block.source_name);
if (unlikely(it == exchange_receiver_map.end()))
throw Exception("Can not find exchange receiver for " + query_block.source_name, ErrorCodes::LOGICAL_ERROR);
// todo choose a more reasonable stream number
for (size_t i = 0; i < max_streams; ++i)
{
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(it->second, taskLogger());
dagContext().getRemoteInputStreams().push_back(stream);
stream = std::make_shared<SquashingBlockInputStream>(stream, 8192, 0, taskLogger());
pipeline.streams.push_back(stream);
}
std::vector<NameAndTypePair> source_columns;
Block block = pipeline.firstStream()->getHeader();
for (const auto & col : block.getColumnsWithTypeAndName())
{
source_columns.emplace_back(NameAndTypePair(col.name, col.type));
}
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}

void DAGQueryBlockInterpreter::executeSourceProjection(DAGPipeline & pipeline, const tipb::Projection & projection)
{
std::vector<NameAndTypePair> input_columns;
pipeline.streams = input_streams_vec[0];
for (auto const & p : pipeline.firstStream()->getHeader().getNamesAndTypesList())
input_columns.emplace_back(p.name, p.type);
DAGExpressionAnalyzer dag_analyzer(std::move(input_columns), context);
ExpressionActionsChain chain;
dag_analyzer.initChain(chain, dag_analyzer.getCurrentInputColumns());
ExpressionActionsChain::Step & last_step = chain.steps.back();
std::vector<NameAndTypePair> output_columns;
NamesWithAliases project_cols;
UniqueNameGenerator unique_name_generator;
for (const auto & expr : projection.exprs())
{
auto expr_name = dag_analyzer.getActions(expr, last_step.actions);
last_step.required_output.emplace_back(expr_name);
const auto & col = last_step.actions->getSampleBlock().getByName(expr_name);
String alias = unique_name_generator.toUniqueName(col.name);
output_columns.emplace_back(alias, col.type);
project_cols.emplace_back(col.name, alias);
}
pipeline.transform([&](auto & stream) { stream = std::make_shared<ExpressionBlockInputStream>(stream, chain.getLastActions(), taskLogger()); });
executeProject(pipeline, project_cols);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(output_columns), context);
}

void DAGQueryBlockInterpreter::executeExtraCastAndSelection(
DAGPipeline & pipeline,
const ExpressionActionsPtr & extra_cast,
const NamesWithAliases & project_after_ts_and_filter_for_remote_read,
const ExpressionActionsPtr & before_where,
const ExpressionActionsPtr & project_after_where,
const String & filter_column_name)
{
/// execute timezone cast and the selection
ExpressionActionsPtr project_for_cop_read;
for (auto & stream : pipeline.streams)
{
if (dynamic_cast<CoprocessorBlockInputStream *>(stream.get()) != nullptr)
{
/// for cop read, just execute the project is enough, because timezone cast and the selection are already done in remote TiFlash
if (!project_after_ts_and_filter_for_remote_read.empty())
{
if (project_for_cop_read == nullptr)
{
project_for_cop_read = generateProjectExpressionActions(stream, context, project_after_ts_and_filter_for_remote_read);
}
stream = std::make_shared<ExpressionBlockInputStream>(stream, project_for_cop_read, taskLogger());
}
}
else
{
/// execute timezone cast or duration cast if needed
if (extra_cast)
stream = std::make_shared<ExpressionBlockInputStream>(stream, extra_cast, taskLogger());
/// execute selection if needed
if (before_where)
{
stream = std::make_shared<FilterBlockInputStream>(stream, before_where, filter_column_name, taskLogger());
if (project_after_where)
stream = std::make_shared<ExpressionBlockInputStream>(stream, project_after_where, taskLogger());
}
}
}
for (auto & stream : pipeline.streams_with_non_joined_data)
{
/// execute selection if needed
if (before_where)
{
stream = std::make_shared<FilterBlockInputStream>(stream, before_where, filter_column_name, taskLogger());
if (project_after_where)
stream = std::make_shared<ExpressionBlockInputStream>(stream, project_after_where, taskLogger());
}
}
}

// To execute a query block, you have to:
// 1. generate the date stream and push it to pipeline.
// 2. assign the analyzer
Expand Down Expand Up @@ -940,51 +1039,12 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
}
else if (query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver)
{
auto it = exchange_receiver_map.find(query_block.source_name);
if (unlikely(it == exchange_receiver_map.end()))
throw Exception("Can not find exchange receiver for " + query_block.source_name, ErrorCodes::LOGICAL_ERROR);
// todo choose a more reasonable stream number
for (size_t i = 0; i < max_streams; i++)
{
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(it->second, taskLogger());
dagContext().getRemoteInputStreams().push_back(stream);
stream = std::make_shared<SquashingBlockInputStream>(stream, 8192, 0, taskLogger());
pipeline.streams.push_back(stream);
}
std::vector<NameAndTypePair> source_columns;
Block block = pipeline.firstStream()->getHeader();
for (const auto & col : block.getColumnsWithTypeAndName())
{
source_columns.emplace_back(NameAndTypePair(col.name, col.type));
}
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
executeExchangeReceiver(pipeline);
recordProfileStreams(pipeline, query_block.source_name);
}
else if (query_block.source->tp() == tipb::ExecType::TypeProjection)
{
std::vector<NameAndTypePair> input_columns;
pipeline.streams = input_streams_vec[0];
for (auto const & p : pipeline.firstStream()->getHeader().getNamesAndTypesList())
input_columns.emplace_back(p.name, p.type);
DAGExpressionAnalyzer dag_analyzer(std::move(input_columns), context);
ExpressionActionsChain chain;
dag_analyzer.initChain(chain, dag_analyzer.getCurrentInputColumns());
ExpressionActionsChain::Step & last_step = chain.steps.back();
std::vector<NameAndTypePair> output_columns;
NamesWithAliases project_cols;
UniqueNameGenerator unique_name_generator;
for (const auto & expr : query_block.source->projection().exprs())
{
auto expr_name = dag_analyzer.getActions(expr, last_step.actions);
last_step.required_output.emplace_back(expr_name);
const auto & col = last_step.actions->getSampleBlock().getByName(expr_name);
String alias = unique_name_generator.toUniqueName(col.name);
output_columns.emplace_back(alias, col.type);
project_cols.emplace_back(col.name, alias);
}
pipeline.transform([&](auto & stream) { stream = std::make_shared<ExpressionBlockInputStream>(stream, chain.getLastActions(), taskLogger()); });
executeProject(pipeline, project_cols);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(output_columns), context);
executeSourceProjection(pipeline, query_block.source->projection());
recordProfileStreams(pipeline, query_block.source_name);
}
else
Expand All @@ -1005,55 +1065,21 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)

if (res.extra_cast || res.before_where)
{
/// execute timezone cast and the selection
ExpressionActionsPtr project_for_cop_read;
for (auto & stream : pipeline.streams)
{
if (dynamic_cast<CoprocessorBlockInputStream *>(stream.get()) != nullptr)
{
/// for cop read, just execute the project is enough, because timezone cast and the selection are already done in remote TiFlash
if (!res.project_after_ts_and_filter_for_remote_read.empty())
{
if (project_for_cop_read == nullptr)
{
project_for_cop_read = generateProjectExpressionActions(stream, context, res.project_after_ts_and_filter_for_remote_read);
}
stream = std::make_shared<ExpressionBlockInputStream>(stream, project_for_cop_read, taskLogger());
}
}
else
{
/// execute timezone cast or duration cast if needed
if (res.extra_cast)
stream = std::make_shared<ExpressionBlockInputStream>(stream, res.extra_cast, taskLogger());
/// execute selection if needed
if (res.before_where)
{
stream = std::make_shared<FilterBlockInputStream>(stream, res.before_where, res.filter_column_name, taskLogger());
if (res.project_after_where)
stream = std::make_shared<ExpressionBlockInputStream>(stream, res.project_after_where, taskLogger());
}
}
}
for (auto & stream : pipeline.streams_with_non_joined_data)
{
/// execute selection if needed
if (res.before_where)
{
stream = std::make_shared<FilterBlockInputStream>(stream, res.before_where, res.filter_column_name, taskLogger());
if (res.project_after_where)
stream = std::make_shared<ExpressionBlockInputStream>(stream, res.project_after_where, taskLogger());
}
}
executeExtraCastAndSelection(
pipeline,
res.extra_cast,
res.project_after_ts_and_filter_for_remote_read,
res.before_where,
res.project_after_where,
res.filter_column_name);
}
if (res.before_where)
{
recordProfileStreams(pipeline, query_block.selection_name);
}

// this log measures the concurrent degree in this mpp task
LOG_INFO(log,
"execution stream size for query block(before aggregation) " << query_block.qb_column_prefix << " is " << pipeline.streams.size());
LOG_INFO(log, "execution stream size for query block(before aggregation) " << query_block.qb_column_prefix << " is " << pipeline.streams.size());

dagContext().final_concurrency = std::max(dagContext().final_concurrency, pipeline.streams.size());

Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ class DAGQueryBlockInterpreter
bool is_right_out_join,
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters,
String & filter_column_name);
void executeExchangeReceiver(DAGPipeline & pipeline);
void executeSourceProjection(DAGPipeline & pipeline, const tipb::Projection & projection);
void executeExtraCastAndSelection(
DAGPipeline & pipeline,
const ExpressionActionsPtr & extra_cast,
const NamesWithAliases & project_after_ts_and_filter_for_remote_read,
const ExpressionActionsPtr & before_where,
const ExpressionActionsPtr & project_after_where,
const String & filter_column_name);
ExpressionActionsPtr genJoinOtherConditionAction(
const tipb::Join & join,
std::vector<NameAndTypePair> & source_columns,
Expand Down

0 comments on commit b2d3ba1

Please sign in to comment.