Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance join test framework with conditional expressions #5543

Merged
merged 12 commits into from
Aug 8, 2022
58 changes: 52 additions & 6 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ void Join::columnPrune(std::unordered_set<String> & used_columns)
right_used_columns.emplace(s);
}

for (const auto & child : using_expr_list->children)
for (const auto & child : join_cols)
{
if (auto * identifier = typeid_cast<ASTIdentifier *>(child.get()))
{
Expand Down Expand Up @@ -1300,12 +1300,39 @@ bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, c
join->set_join_exec_type(tipb::JoinExecType::TypeHashJoin);
join->set_inner_idx(1);

for (auto & key : using_expr_list->children)
for (const auto & key : join_cols)
{
fillJoinKeyAndFieldType(key, children[0]->output_schema, join->add_left_join_keys(), join->add_probe_types(), collator_id);
fillJoinKeyAndFieldType(key, children[1]->output_schema, join->add_right_join_keys(), join->add_build_types(), collator_id);
}

for (const auto & expr : left_conds)
{
tipb::Expr * cond = join->add_left_conditions();
astToPB(children[0]->output_schema, expr, cond, collator_id, context);
}

for (const auto & expr : right_conds)
{
tipb::Expr * cond = join->add_right_conditions();
astToPB(children[1]->output_schema, expr, cond, collator_id, context);
}

DAGSchema merged_children_schema{children[0]->output_schema};
merged_children_schema.insert(merged_children_schema.end(), children[1]->output_schema.begin(), children[1]->output_schema.end());

for (const auto & expr : other_conds)
{
tipb::Expr * cond = join->add_other_conditions();
astToPB(merged_children_schema, expr, cond, collator_id, context);
}

for (const auto & expr : other_eq_conds_from_in)
{
tipb::Expr * cond = join->add_other_eq_conditions_from_in();
astToPB(merged_children_schema, expr, cond, collator_id, context);
}

auto * left_child_executor = join->add_children();
children[0]->toTiPBExecutor(left_child_executor, collator_id, mpp_info, context);
auto * right_child_executor = join->add_children();
Expand Down Expand Up @@ -1342,7 +1369,7 @@ void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & propertie
}
};

for (auto & key : using_expr_list->children)
for (const auto & key : join_cols)
{
push_back_partition_key(left_partition_keys, children[0]->output_schema, key);
push_back_partition_key(right_partition_keys, children[1]->output_schema, key);
Expand Down Expand Up @@ -1691,14 +1718,23 @@ static void buildRightSideJoinSchema(DAGSchema & schema, const DAGSchema & right
}
}

ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, tipb::JoinType tp, ASTPtr using_expr_list)
// compileJoin constructs a mocked Join executor node, note that all conditional expression params can be default
ExecutorPtr compileJoin(size_t & executor_index,
ExecutorPtr left,
ExecutorPtr right,
tipb::JoinType tp,
const ASTs & join_cols,
const ASTs & left_conds,
const ASTs & right_conds,
const ASTs & other_conds,
const ASTs & other_eq_conds_from_in)
{
DAGSchema output_schema;

buildLeftSideJoinSchema(output_schema, left->output_schema, tp);
buildRightSideJoinSchema(output_schema, right->output_schema, tp);

auto join = std::make_shared<mock::Join>(executor_index, output_schema, tp, using_expr_list);
auto join = std::make_shared<mock::Join>(executor_index, output_schema, tp, join_cols, left_conds, right_conds, other_conds, other_eq_conds_from_in);
join->children.push_back(left);
join->children.push_back(right);

Expand All @@ -1723,7 +1759,17 @@ ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr r
default:
throw Exception("Unsupported join type");
}
return compileJoin(executor_index, left, right, tp, ast_join.using_expression_list);

// in legacy test framework, we only support using_expr of join
ASTs join_cols;
if (ast_join.using_expression_list)
{
for (const auto & key : ast_join.using_expression_list->children)
{
join_cols.push_back(key);
}
}
return compileJoin(executor_index, left, right, tp, join_cols);
}

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type)
Expand Down
21 changes: 13 additions & 8 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,17 +255,22 @@ struct Join : Executor
{
tipb::JoinType tp;

const ASTPtr using_expr_list;
const ASTs join_cols{};
const ASTs left_conds{};
const ASTs right_conds{};
const ASTs other_conds{};
const ASTs other_eq_conds_from_in{};

// todo(ljr): support on expr
const ASTPtr on_expr{};

Join(size_t & index_, const DAGSchema & output_schema_, tipb::JoinType tp_, ASTPtr using_expr_list_)
Join(size_t & index_, const DAGSchema & output_schema_, tipb::JoinType tp_, const ASTs & join_cols_, const ASTs & l_conds, const ASTs & r_conds, const ASTs & o_conds, const ASTs & o_eq_conds)
: Executor(index_, "Join_" + std::to_string(index_), output_schema_)
, tp(tp_)
, using_expr_list(using_expr_list_)
, join_cols(join_cols_)
, left_conds(l_conds)
, right_conds(r_conds)
, other_conds(o_conds)
, other_eq_conds_from_in(o_eq_conds)
{
if (using_expr_list == nullptr)
if (!(join_cols.size() + left_conds.size() + right_conds.size() + other_conds.size() + other_eq_conds_from_in.size()))
throw Exception("No join condition found.");
}

Expand Down Expand Up @@ -359,7 +364,7 @@ ExecutorPtr compileProject(ExecutorPtr input, size_t & executor_index, ASTPtr se
/// avoid using ASTTableJoin.
ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, ASTPtr params);

ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, tipb::JoinType tp, ASTPtr using_expr_list);
ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, tipb::JoinType tp, const ASTs & join_cols, const ASTs & left_conds = {}, const ASTs & right_conds = {}, const ASTs & other_conds = {}, const ASTs & other_eq_conds_from_in = {});

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/gtest_collation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ try
{
/// Check collation for executors
auto request = context.scan(join_table, "t1")
.join(context.scan(join_table, "t2"), {col("a")}, tipb::JoinType::TypeInnerJoin)
.join(context.scan(join_table, "t2"), tipb::JoinType::TypeInnerJoin, {col("a")})
.aggregation({Max(col("a")), Min(col("a")), Count(col("a"))}, {col("b")})
.build(context);
ASSERT_EQ(checkExecutorCollation(request).size(), 0);
Expand Down
48 changes: 24 additions & 24 deletions dbms/src/Flash/tests/gtest_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,12 @@ try
auto request = table1.join(
table2.join(
table3.join(table4,
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin)
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")})
.build(context);

String expected = R"(
Expand Down Expand Up @@ -445,12 +445,12 @@ CreatingSets
auto request = receiver1.join(
receiver2.join(
receiver3.join(receiver4,
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin)
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")})
.build(context);

String expected = R"(
Expand Down Expand Up @@ -487,12 +487,12 @@ CreatingSets
auto request = receiver1.join(
receiver2.join(
receiver3.join(receiver4,
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin)
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")})
.exchangeSender(tipb::PassThrough)
.build(context);

Expand Down Expand Up @@ -533,8 +533,8 @@ try

auto request = table1.join(
table2,
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin)
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")})
.aggregation({Max(col("r_a"))}, {col("join_c")})
.build(context);
String expected = R"(
Expand Down Expand Up @@ -562,8 +562,8 @@ CreatingSets

auto request = table1.join(
table2,
{col("join_c")},
tipb::JoinType::TypeRightOuterJoin)
tipb::JoinType::TypeRightOuterJoin,
{col("join_c")})
.aggregation({Max(col("r_a"))}, {col("join_c")})
.build(context);
String expected = R"(
Expand Down Expand Up @@ -594,8 +594,8 @@ CreatingSets

auto request = receiver1.join(
receiver2,
{col("join_c")},
tipb::JoinType::TypeRightOuterJoin)
tipb::JoinType::TypeRightOuterJoin,
{col("join_c")})
.aggregation({Sum(col("r_a"))}, {col("join_c")})
.exchangeSender(tipb::PassThrough)
.limit(10)
Expand Down
Loading