Skip to content

Commit

Permalink
tmp save
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise committed Jun 13, 2022
1 parent d31b310 commit 472dd1c
Show file tree
Hide file tree
Showing 17 changed files with 375 additions and 9 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ std::unordered_map<String, BlockInputStreams> & DAGContext::getProfileStreamsMap
return profile_streams_map;
}

void DAGContext::updateFinalConcurrency(size_t cur_streams_size, size_t max_streams)
{
final_concurrency = std::min(std::max(final_concurrency, cur_streams_size), max_streams);
}

void DAGContext::initExecutorIdToJoinIdMap()
{
// only mpp task has join executor
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ class DAGContext
return sql_mode & f;
}

void updateFinalConcurrency(size_t cur_streams_size, size_t max_streams);

bool isTest() const { return is_test; }
void setColumnsForTest(std::unordered_map<String, ColumnsWithTypeAndName> & columns_for_test_map_) { columns_for_test_map = columns_for_test_map_; }
ColumnsWithTypeAndName columnsForTest(String executor_id);
Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class DAGExpressionAnalyzer : private boost::noncopyable

const Context & getContext() const { return context; }

void reset(const std::vector<NameAndTypePair> & source_columns_)
{
source_columns = source_columns_;
prepared_sets.clear();
}

const std::vector<NameAndTypePair> & getCurrentInputColumns() const;

DAGPreparedSets & getPreparedSets() { return prepared_sets; }
Expand Down Expand Up @@ -153,13 +159,13 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const tipb::Window & window,
const size_t window_columns_start_index);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
NamesAndTypes buildOrderColumns(
const ExpressionActionsPtr & actions,
const ::google::protobuf::RepeatedPtrField<tipb::ByItem> & order_by);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
void appendCastAfterAgg(
const ExpressionActionsPtr & actions,
const tipb::Aggregation & agg);
Expand Down
7 changes: 2 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,10 +548,7 @@ void DAGQueryBlockInterpreter::handleProjection(DAGPipeline & pipeline, const ti
output_columns.emplace_back(alias, col.type);
project_cols.emplace_back(col.name, alias);
}
pipeline.transform([&](auto & stream) {
stream = std::make_shared<ExpressionBlockInputStream>(stream, chain.getLastActions(), log->identifier());
stream->setExtraInfo("before projection");
});
executeExpression(pipeline, chain.getLastActions(), log, "before projection");
executeProject(pipeline, project_cols, "projection");
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(output_columns), context);
}
Expand Down Expand Up @@ -663,7 +660,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
"execution stream size for query block(before aggregation) {} is {}",
query_block.qb_column_prefix,
pipeline.streams.size());
dagContext().final_concurrency = std::min(std::max(dagContext().final_concurrency, pipeline.streams.size()), max_streams);
dagContext().updateFinalConcurrency(pipeline.streams.size(), max_streams);

if (res.before_aggregation)
{
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/SharedQueryBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
Expand Down Expand Up @@ -102,4 +103,19 @@ ExpressionActionsPtr generateProjectExpressionActions(
project->add(ExpressionAction::project(project_cols));
return project;
}

void executeExpression(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expr_actions,
const LoggerPtr & log,
const String & extra_info = "")
{
if (expr_actions && !expr_actions->getActions().empty())
{
pipeline.transform([&](auto & stream) {
stream = std::make_shared<ExpressionBlockInputStream>(stream, expr_actions, log->identifier());
stream->setExtraInfo(extra_info);
});
}
}
} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ ExpressionActionsPtr generateProjectExpressionActions(
const BlockInputStreamPtr & stream,
const Context & context,
const NamesWithAliases & project_cols);

void executeExpression(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expr_actions,
const LoggerPtr & log,
const String & extra_info = "");
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/FmtUtils.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Planner/PhysicalPlan.h>
#include <Flash/Planner/PhysicalPlanHelper.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -69,5 +70,7 @@ void PhysicalPlan::transform(DAGPipeline & pipeline, Context & context, size_t m
{
transformImpl(pipeline, context, max_streams);
recordProfileStreams(pipeline, context);
context.getDAGContext()->updateFinalConcurrency(pipeline.streams.size(), max_streams);
restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log);
}
} // namespace DB
33 changes: 33 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,44 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Planner/PhysicalPlanBuilder.h>
#include <Flash/Planner/plans/PhysicalLimit.h>
#include <Flash/Planner/plans/PhysicalTopN.h>
#include <Flash/Planner/plans/PhysicalSource.h>
#include <Interpreters/Context.h>

namespace DB
{
void PhysicalPlanBuilder::build(const String & executor_id, const tipb::Executor * executor)
{
assert(executor);
switch (executor->tp())
{
case tipb::ExecType::TypeLimit:
cur_plans.push_back(PhysicalLimit::build(executor_id, log->identifier(), executor->limit(), popBack()));
break;
case tipb::ExecType::TypeTopN:
cur_plans.push_back(PhysicalTopN::build(context, executor_id, log->identifier(), executor->topn(), popBack()));
break;
default:
throw TiFlashException(fmt::format("{} executor is not supported", executor->tp()), Errors::Planner::Unimplemented);
}
}

DAGContext & PhysicalPlanBuilder::dagContext() const
{
return *context.getDAGContext();
}

PhysicalPlanPtr PhysicalPlanBuilder::popBack()
{
RUNTIME_ASSERT(!cur_plans.empty(), log, "cur_plans is empty, cannot popBack");
PhysicalPlanPtr back = cur_plans.back();
cur_plans.pop_back();
return back;
}

void PhysicalPlanBuilder::buildSource(const Block & sample_block)
{
cur_plans.push_back(PhysicalSource::build(sample_block, log->identifier()));
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Flash/Planner/PhysicalPlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/Logger.h>
#include <Flash/Planner/PhysicalPlan.h>
#include <common/logger_useful.h>
#include <tipb/executor.pb.h>

namespace DB
{
Expand All @@ -29,6 +30,8 @@ class PhysicalPlanBuilder
, log(Logger::get("PhysicalPlanBuilder", req_id))
{}

void build(const String & executor_id, const tipb::Executor * executor);

void buildSource(const Block & sample_block);

PhysicalPlanPtr getResult() const
Expand All @@ -37,10 +40,15 @@ class PhysicalPlanBuilder
return cur_plans.back();
}

private:
PhysicalPlanPtr popBack();

DAGContext & dagContext() const;

private:
std::vector<PhysicalPlanPtr> cur_plans;

[[maybe_unused]] Context & context;
Context & context;

LoggerPtr log;
};
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ Names schemaToNames(const NamesAndTypes & schema)
names.push_back(column.name);
return names;
}

ExpressionActionsPtr newActions(const Block & input_block, const Context & context)
{
const ColumnsWithTypeAndName & actions_input_columns = input_block.getColumnsWithTypeAndName();
return std::make_shared<ExpressionActions>(actions_input_columns, context.getSettingsRef());
}
} // namespace DB::PhysicalPlanHelper
5 changes: 5 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

#pragma once

#include <Core/Block.h>
#include <Core/NamesAndTypes.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>

namespace DB::PhysicalPlanHelper
{
Names schemaToNames(const NamesAndTypes & schema);

ExpressionActionsPtr newActions(const Block & input_block, const Context & context);
} // namespace DB::PhysicalPlanHelper
4 changes: 4 additions & 0 deletions dbms/src/Flash/Planner/PlanType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ String PlanType::toString() const
{
case Source:
return "Source";
case Limit:
return "Limit";
case TopN:
return "TopN";
default:
throw TiFlashException("Unknown PlanType", Errors::Planner::Internal);
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Planner/PlanType.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ struct PlanType
enum PlanTypeEnum
{
Source = 0,
Limit = 1,
TopN = 2,
};
PlanTypeEnum enum_value;

Expand Down
58 changes: 58 additions & 0 deletions dbms/src/Flash/Planner/plans/PhysicalLimit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Logger.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Planner/plans/PhysicalLimit.h>
#include <Interpreters/Context.h>

namespace DB
{
PhysicalPlanPtr PhysicalLimit::build(
const String & executor_id,
const String & req_id,
const tipb::Limit & limit,
PhysicalPlanPtr child)
{
assert(child);
auto physical_limit = std::make_shared<PhysicalLimit>(executor_id, child->getSchema(), req_id, limit.limit());
physical_limit->appendChild(child);
return physical_limit;
}

void PhysicalLimit::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams)
{
child->transform(pipeline, context, max_streams);

pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, log->identifier(), false); });
if (pipeline.hasMoreThanOneStream())
{
executeUnion(pipeline, max_streams, log, false, "for partial limit");
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, log->identifier(), false); });
}
}

void PhysicalLimit::finalize(const Names & parent_require)
{
child->finalize(parent_require);
}

const Block & PhysicalLimit::getSampleBlock() const
{
return child->getSampleBlock();
}
} // namespace DB
49 changes: 49 additions & 0 deletions dbms/src/Flash/Planner/plans/PhysicalLimit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Planner/plans/PhysicalUnary.h>
#include <tipb/executor.pb.h>

namespace DB
{
class PhysicalLimit : public PhysicalUnary
{
public:
static PhysicalPlanPtr build(
const String & executor_id,
const String & req_id,
const tipb::Limit & limit,
PhysicalPlanPtr child);

PhysicalLimit(
const String & executor_id_,
const NamesAndTypes & schema_,
const String & req_id,
size_t limit_)
: PhysicalUnary(executor_id_, PlanType::Limit, schema_, req_id)
, limit(limit_)
{}

void finalize(const Names & parent_require) override;

const Block & getSampleBlock() const override;

private:
void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override;

size_t limit;
}
}
Loading

0 comments on commit 472dd1c

Please sign in to comment.