Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine handleJoin #4722

Merged
merged 46 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
e99dac6
tiflash_join
SeaRise Apr 20, 2022
204d6cb
move genJoinOtherConditionAction and prepareJoin to helper
SeaRise Apr 20, 2022
bf3fad8
fix
SeaRise Apr 20, 2022
edce58a
f
SeaRise Apr 20, 2022
9ebf44c
remove useless func
SeaRise Apr 20, 2022
37be36e
refine INterpreter/join
SeaRise Apr 20, 2022
8dbbac0
fix
SeaRise Apr 20, 2022
157bfbb
fix
SeaRise Apr 21, 2022
e3260f2
update
SeaRise Apr 21, 2022
3b7b252
reverse
SeaRise Apr 21, 2022
b37c597
y
SeaRise Apr 21, 2022
026593b
fix
SeaRise Apr 21, 2022
99b774c
fix
SeaRise Apr 21, 2022
ccd9321
update
SeaRise Apr 21, 2022
aaee372
fix
SeaRise Apr 21, 2022
ad28042
u
SeaRise Apr 21, 2022
62b934b
fix tidy warn
SeaRise Apr 21, 2022
2ee4d14
u
SeaRise Apr 22, 2022
fc31bd2
Merge branch 'master' into refine_interpreter_join
SeaRise Apr 22, 2022
97b660b
Merge branch 'master' into refine_interpreter_join
SeaRise Apr 24, 2022
39a3c08
fix
SeaRise Apr 24, 2022
51c9a41
f
SeaRise Apr 24, 2022
697380a
assert
SeaRise Apr 24, 2022
fc7e531
Merge branch 'master' into refine_interpreter_join
SeaRise Apr 25, 2022
1065439
address comments
SeaRise Apr 25, 2022
a9bc03d
Merge branch 'master' into refine_interpreter_join
SeaRise Apr 26, 2022
301a9ad
address comments
SeaRise Apr 26, 2022
2c1ea82
Update dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h
SeaRise May 5, 2022
f841dc5
Merge branch 'master' into refine_interpreter_join
SeaRise May 5, 2022
7616b00
Merge branch 'master' into refine_interpreter_join
SeaRise May 9, 2022
0f234a1
fix
SeaRise May 9, 2022
024f9f8
address comments
SeaRise May 9, 2022
70dec60
address comment
SeaRise May 9, 2022
370677f
add tests
SeaRise May 9, 2022
2f4cea4
fix
SeaRise May 9, 2022
6ef1ed1
fix
SeaRise May 9, 2022
bae536e
address comment
SeaRise May 9, 2022
ec1ce14
Merge branch 'master' into refine_interpreter_join
SeaRise May 9, 2022
96dcc35
Merge branch 'master' into refine_interpreter_join
SeaRise May 10, 2022
4f965f6
fix build fail
SeaRise May 10, 2022
d8da60e
Merge branch 'refine_interpreter_join' of https://github.com/SeaRise/…
SeaRise May 10, 2022
ae6603a
Merge branch 'master' into refine_interpreter_join
SeaRise May 11, 2022
a9e2d62
Merge branch 'master' into refine_interpreter_join
SeaRise May 18, 2022
fb4b83f
Merge branch 'master' into refine_interpreter_join
SeaRise May 23, 2022
2a40af5
fix
SeaRise May 23, 2022
d7852c7
Merge branch 'master' into refine_interpreter_join
SeaRise May 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 60 additions & 269 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp

Large diffs are not rendered by default.

14 changes: 0 additions & 14 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,11 @@ class DAGQueryBlockInterpreter
void handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void handleJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query);
void prepareJoin(
const google::protobuf::RepeatedPtrField<tipb::Expr> & keys,
const DataTypes & key_types,
DAGPipeline & pipeline,
Names & key_names,
bool left,
bool is_right_out_join,
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters,
String & filter_column_name);
void handleExchangeReceiver(DAGPipeline & pipeline);
void handleMockExchangeReceiver(DAGPipeline & pipeline);
void handleProjection(DAGPipeline & pipeline, const tipb::Projection & projection);
void handleWindow(DAGPipeline & pipeline, const tipb::Window & window);
void handleWindowOrder(DAGPipeline & pipeline, const tipb::Sort & window_sort);
ExpressionActionsPtr genJoinOtherConditionAction(
const tipb::Join & join,
NamesAndTypes & source_columns,
String & filter_column_for_other_condition,
String & filter_column_for_other_eq_condition);
void executeWhere(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column);
void executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr);
void executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc);
Expand Down
356 changes: 356 additions & 0 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashException.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/getLeastSupertype.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Interpreters/Context.h>
#include <Interpreters/Join.h>
#include <Storages/Transaction/TypeMapping.h>
#include <fmt/format.h>

#include <unordered_map>

namespace DB::JoinInterpreterHelper
{
namespace
{
std::pair<ASTTableJoin::Kind, size_t> getJoinKindAndBuildSideIndex(const tipb::Join & join)
{
static const std::unordered_map<tipb::JoinType, ASTTableJoin::Kind> equal_join_type_map{
{tipb::JoinType::TypeInnerJoin, ASTTableJoin::Kind::Inner},
{tipb::JoinType::TypeLeftOuterJoin, ASTTableJoin::Kind::Left},
{tipb::JoinType::TypeRightOuterJoin, ASTTableJoin::Kind::Right},
{tipb::JoinType::TypeSemiJoin, ASTTableJoin::Kind::Inner},
{tipb::JoinType::TypeAntiSemiJoin, ASTTableJoin::Kind::Anti},
{tipb::JoinType::TypeLeftOuterSemiJoin, ASTTableJoin::Kind::LeftSemi},
{tipb::JoinType::TypeAntiLeftOuterSemiJoin, ASTTableJoin::Kind::LeftAnti}};
static const std::unordered_map<tipb::JoinType, ASTTableJoin::Kind> cartesian_join_type_map{
{tipb::JoinType::TypeInnerJoin, ASTTableJoin::Kind::Cross},
{tipb::JoinType::TypeLeftOuterJoin, ASTTableJoin::Kind::Cross_Left},
{tipb::JoinType::TypeRightOuterJoin, ASTTableJoin::Kind::Cross_Right},
{tipb::JoinType::TypeSemiJoin, ASTTableJoin::Kind::Cross},
{tipb::JoinType::TypeAntiSemiJoin, ASTTableJoin::Kind::Cross_Anti},
{tipb::JoinType::TypeLeftOuterSemiJoin, ASTTableJoin::Kind::Cross_LeftSemi},
{tipb::JoinType::TypeAntiLeftOuterSemiJoin, ASTTableJoin::Kind::Cross_LeftAnti}};

const auto & join_type_map = join.left_join_keys_size() == 0 ? cartesian_join_type_map : equal_join_type_map;
auto join_type_it = join_type_map.find(join.join_type());
if (unlikely(join_type_it == join_type_map.end()))
throw TiFlashException("Unknown join type in dag request", Errors::Coprocessor::BadRequest);

ASTTableJoin::Kind kind = join_type_it->second;

/// in DAG request, inner part is the build side, however for TiFlash implementation,
/// the build side must be the right side, so need to swap the join side if needed
/// 1. for (cross) inner join, there is no problem in this swap.
/// 2. for (cross) semi/anti-semi join, the build side is always right, needn't swap.
/// 3. for non-cross left/right join, there is no problem in this swap.
/// 4. for cross left join, the build side is always right, needn't and can't swap.
/// 5. for cross right join, the build side is always left, so it will always swap and change to cross left join.
/// note that whatever the build side is, we can't support cross-right join now.

size_t build_side_index = 0;
switch (kind)
{
case ASTTableJoin::Kind::Cross_Right:
build_side_index = 0;
break;
case ASTTableJoin::Kind::Cross_Left:
build_side_index = 1;
break;
default:
build_side_index = join.inner_idx();
}
assert(build_side_index == 0 || build_side_index == 1);

// should swap join side.
if (build_side_index != 1)
{
switch (kind)
{
case ASTTableJoin::Kind::Left:
kind = ASTTableJoin::Kind::Right;
break;
case ASTTableJoin::Kind::Right:
kind = ASTTableJoin::Kind::Left;
break;
case ASTTableJoin::Kind::Cross_Right:
kind = ASTTableJoin::Kind::Cross_Left;
default:; // just `default`, for other kinds, don't need to change kind.
}
}

return {kind, build_side_index};
}

DataTypes getJoinKeyTypes(const tipb::Join & join)
{
if (unlikely(join.left_join_keys_size() != join.right_join_keys_size()))
throw TiFlashException("size of join.left_join_keys != size of join.right_join_keys", Errors::Coprocessor::BadRequest);
DataTypes key_types;
for (int i = 0; i < join.left_join_keys_size(); ++i)
{
if (unlikely(!exprHasValidFieldType(join.left_join_keys(i)) || !exprHasValidFieldType(join.right_join_keys(i))))
throw TiFlashException("Join key without field type", Errors::Coprocessor::BadRequest);
DataTypes types;
types.emplace_back(getDataTypeByFieldTypeForComputingLayer(join.left_join_keys(i).field_type()));
types.emplace_back(getDataTypeByFieldTypeForComputingLayer(join.right_join_keys(i).field_type()));
DataTypePtr common_type = getLeastSupertype(types);
key_types.emplace_back(common_type);
}
return key_types;
}

TiDB::TiDBCollators getJoinKeyCollators(const tipb::Join & join, const DataTypes & join_key_types)
{
TiDB::TiDBCollators collators;
size_t join_key_size = join_key_types.size();
if (join.probe_types_size() == static_cast<int>(join_key_size) && join.build_types_size() == join.probe_types_size())
for (size_t i = 0; i < join_key_size; ++i)
{
if (removeNullable(join_key_types[i])->isString())
{
if (unlikely(join.probe_types(i).collate() != join.build_types(i).collate()))
throw TiFlashException("Join with different collators on the join key", Errors::Coprocessor::BadRequest);
collators.push_back(getCollatorFromFieldType(join.probe_types(i)));
}
else
collators.push_back(nullptr);
}
return collators;
}

std::tuple<ExpressionActionsPtr, String, String> doGenJoinOtherConditionAction(
const Context & context,
const tipb::Join & join,
const NamesAndTypes & source_columns)
{
if (join.other_conditions_size() == 0 && join.other_eq_conditions_from_in_size() == 0)
return {nullptr, "", ""};

DAGExpressionAnalyzer dag_analyzer(source_columns, context);
ExpressionActionsChain chain;

String filter_column_for_other_condition;
if (join.other_conditions_size() > 0)
{
std::vector<const tipb::Expr *> condition_vector;
for (const auto & c : join.other_conditions())
{
condition_vector.push_back(&c);
}
filter_column_for_other_condition = dag_analyzer.appendWhere(chain, condition_vector);
}

String filter_column_for_other_eq_condition;
if (join.other_eq_conditions_from_in_size() > 0)
{
std::vector<const tipb::Expr *> condition_vector;
for (const auto & c : join.other_eq_conditions_from_in())
{
condition_vector.push_back(&c);
}
filter_column_for_other_eq_condition = dag_analyzer.appendWhere(chain, condition_vector);
}

return {chain.getLastActions(), std::move(filter_column_for_other_condition), std::move(filter_column_for_other_eq_condition)};
}
} // namespace

TiFlashJoin::TiFlashJoin(const tipb::Join & join_) // NOLINT(cppcoreguidelines-pro-type-member-init)
: join(join_)
, join_key_types(getJoinKeyTypes(join_))
, join_key_collators(getJoinKeyCollators(join_, join_key_types))
{
std::tie(kind, build_side_index) = getJoinKindAndBuildSideIndex(join);
strictness = isSemiJoin() ? ASTTableJoin::Strictness::Any : ASTTableJoin::Strictness::All;
}

String TiFlashJoin::genMatchHelperName(const Block & header1, const Block & header2) const
{
if (!isLeftSemiFamily())
{
return "";
}

size_t i = 0;
String match_helper_name = fmt::format("{}{}", Join::match_helper_prefix, i);
while (header1.has(match_helper_name) || header2.has(match_helper_name))
{
match_helper_name = fmt::format("{}{}", Join::match_helper_prefix, ++i);
}
return match_helper_name;
}

NamesAndTypes TiFlashJoin::genColumnsForOtherJoinFilter(
const Block & left_input_header,
const Block & right_input_header,
const ExpressionActionsPtr & probe_prepare_join_actions) const
{
#ifndef NDEBUG
auto is_prepare_actions_valid = [](const Block & origin_block, const ExpressionActionsPtr & prepare_actions) {
const Block & prepare_sample_block = prepare_actions->getSampleBlock();
for (const auto & p : origin_block)
{
if (!prepare_sample_block.has(p.name))
return false;
}
return true;
};
if (unlikely(!is_prepare_actions_valid(build_side_index == 1 ? left_input_header : right_input_header, probe_prepare_join_actions)))
{
throw TiFlashException("probe_prepare_join_actions isn't valid", Errors::Coprocessor::Internal);
}
#endif

/// columns_for_other_join_filter is a vector of columns used
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
/// as the input columns when compiling other join filter.
/// Note the order in the column vector is very important:
/// first the columns in left_input_header, then followed
/// by the columns in right_input_header, if there are other
/// columns generated before compile other join filter, then
/// append the extra columns afterwards. In order to figure out
/// whether a given column is already in the column vector or
/// not quickly, we use another set to store the column names.

/// The order of columns must be {left_input, right_input, extra columns},
/// because tidb requires the input schema of join to be {left_input, right_input}.
/// Extra columns are appended to prevent extra columns from being repeatedly generated.

NamesAndTypes columns_for_other_join_filter;
std::unordered_set<String> column_set_for_origin_columns;

auto append_origin_columns = [&columns_for_other_join_filter, &column_set_for_origin_columns](const Block & header, bool make_nullable) {
for (const auto & p : header)
{
columns_for_other_join_filter.emplace_back(p.name, make_nullable ? makeNullable(p.type) : p.type);
column_set_for_origin_columns.emplace(p.name);
}
};
append_origin_columns(left_input_header, join.join_type() == tipb::JoinType::TypeRightOuterJoin);
append_origin_columns(right_input_header, join.join_type() == tipb::JoinType::TypeLeftOuterJoin);

/// append the columns generated by probe side prepare join actions.
/// the new columns are
/// - filter_column and related temporary columns
/// - join keys and related temporary columns
auto append_new_columns = [&columns_for_other_join_filter, &column_set_for_origin_columns](const Block & header, bool make_nullable) {
for (const auto & p : header)
{
if (column_set_for_origin_columns.find(p.name) == column_set_for_origin_columns.end())
columns_for_other_join_filter.emplace_back(p.name, make_nullable ? makeNullable(p.type) : p.type);
}
};
bool make_nullable = build_side_index == 1
? join.join_type() == tipb::JoinType::TypeRightOuterJoin
: join.join_type() == tipb::JoinType::TypeLeftOuterJoin;
append_new_columns(probe_prepare_join_actions->getSampleBlock(), make_nullable);

return columns_for_other_join_filter;
}

/// all the columns from build side streams should be added after join, even for the join key.
NamesAndTypesList TiFlashJoin::genColumnsAddedByJoin(
const Block & build_side_header,
const String & match_helper_name) const
{
NamesAndTypesList columns_added_by_join;
bool make_nullable = isTiFlashLeftJoin();
for (auto const & p : build_side_header)
{
columns_added_by_join.emplace_back(p.name, make_nullable ? makeNullable(p.type) : p.type);
}
if (!match_helper_name.empty())
{
columns_added_by_join.emplace_back(match_helper_name, Join::match_helper_type);
}
return columns_added_by_join;
}

NamesAndTypes TiFlashJoin::genJoinOutputColumns(
const Block & left_input_header,
const Block & right_input_header,
const String & match_helper_name) const
{
NamesAndTypes join_output_columns;
auto append_output_columns = [&join_output_columns](const Block & header, bool make_nullable) {
for (auto const & p : header)
{
join_output_columns.emplace_back(p.name, make_nullable ? makeNullable(p.type) : p.type);
}
};

append_output_columns(left_input_header, join.join_type() == tipb::JoinType::TypeRightOuterJoin);
if (!isSemiJoin())
{
/// for semi join, the columns from right table will be ignored
append_output_columns(right_input_header, join.join_type() == tipb::JoinType::TypeLeftOuterJoin);
}

if (!match_helper_name.empty())
{
join_output_columns.emplace_back(match_helper_name, Join::match_helper_type);
}

return join_output_columns;
}

std::tuple<ExpressionActionsPtr, String, String> TiFlashJoin::genJoinOtherConditionAction(
const Context & context,
const Block & left_input_header,
const Block & right_input_header,
const ExpressionActionsPtr & probe_side_prepare_join) const
{
auto columns_for_other_join_filter
= genColumnsForOtherJoinFilter(
left_input_header,
right_input_header,
probe_side_prepare_join);

return doGenJoinOtherConditionAction(context, join, columns_for_other_join_filter);
}

std::tuple<ExpressionActionsPtr, Names, String> prepareJoin(
const Context & context,
const Block & input_header,
const google::protobuf::RepeatedPtrField<tipb::Expr> & keys,
const DataTypes & key_types,
bool left,
bool is_right_out_join,
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters)
{
NamesAndTypes source_columns;
for (auto const & p : input_header)
source_columns.emplace_back(p.name, p.type);
DAGExpressionAnalyzer dag_analyzer(std::move(source_columns), context);
ExpressionActionsChain chain;
Names key_names;
String filter_column_name;
dag_analyzer.appendJoinKeyAndJoinFilters(chain, keys, key_types, key_names, left, is_right_out_join, filters, filter_column_name);
return {chain.getLastActions(), std::move(key_names), std::move(filter_column_name)};
}

std::function<size_t()> concurrencyBuildIndexGenerator(size_t join_build_concurrency)
{
size_t init_value = 0;
return [init_value, join_build_concurrency]() mutable {
return (init_value++) % join_build_concurrency;
};
}
} // namespace DB::JoinInterpreterHelper
Loading