Skip to content

Commit

Permalink
Enhance join test framework with conditional expressions (#5543)
Browse files Browse the repository at this point in the history
close #5351
  • Loading branch information
Willendless authored Aug 8, 2022
1 parent bcf72e3 commit d8fae61
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 73 deletions.
58 changes: 52 additions & 6 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ void Join::columnPrune(std::unordered_set<String> & used_columns)
right_used_columns.emplace(s);
}

for (const auto & child : using_expr_list->children)
for (const auto & child : join_cols)
{
if (auto * identifier = typeid_cast<ASTIdentifier *>(child.get()))
{
Expand Down Expand Up @@ -1300,12 +1300,39 @@ bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, c
join->set_join_exec_type(tipb::JoinExecType::TypeHashJoin);
join->set_inner_idx(1);

for (auto & key : using_expr_list->children)
for (const auto & key : join_cols)
{
fillJoinKeyAndFieldType(key, children[0]->output_schema, join->add_left_join_keys(), join->add_probe_types(), collator_id);
fillJoinKeyAndFieldType(key, children[1]->output_schema, join->add_right_join_keys(), join->add_build_types(), collator_id);
}

for (const auto & expr : left_conds)
{
tipb::Expr * cond = join->add_left_conditions();
astToPB(children[0]->output_schema, expr, cond, collator_id, context);
}

for (const auto & expr : right_conds)
{
tipb::Expr * cond = join->add_right_conditions();
astToPB(children[1]->output_schema, expr, cond, collator_id, context);
}

DAGSchema merged_children_schema{children[0]->output_schema};
merged_children_schema.insert(merged_children_schema.end(), children[1]->output_schema.begin(), children[1]->output_schema.end());

for (const auto & expr : other_conds)
{
tipb::Expr * cond = join->add_other_conditions();
astToPB(merged_children_schema, expr, cond, collator_id, context);
}

for (const auto & expr : other_eq_conds_from_in)
{
tipb::Expr * cond = join->add_other_eq_conditions_from_in();
astToPB(merged_children_schema, expr, cond, collator_id, context);
}

auto * left_child_executor = join->add_children();
children[0]->toTiPBExecutor(left_child_executor, collator_id, mpp_info, context);
auto * right_child_executor = join->add_children();
Expand Down Expand Up @@ -1342,7 +1369,7 @@ void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & propertie
}
};

for (auto & key : using_expr_list->children)
for (const auto & key : join_cols)
{
push_back_partition_key(left_partition_keys, children[0]->output_schema, key);
push_back_partition_key(right_partition_keys, children[1]->output_schema, key);
Expand Down Expand Up @@ -1691,14 +1718,23 @@ static void buildRightSideJoinSchema(DAGSchema & schema, const DAGSchema & right
}
}

ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, tipb::JoinType tp, ASTPtr using_expr_list)
// compileJoin constructs a mocked Join executor node, note that all conditional expression params can be default
ExecutorPtr compileJoin(size_t & executor_index,
ExecutorPtr left,
ExecutorPtr right,
tipb::JoinType tp,
const ASTs & join_cols,
const ASTs & left_conds,
const ASTs & right_conds,
const ASTs & other_conds,
const ASTs & other_eq_conds_from_in)
{
DAGSchema output_schema;

buildLeftSideJoinSchema(output_schema, left->output_schema, tp);
buildRightSideJoinSchema(output_schema, right->output_schema, tp);

auto join = std::make_shared<mock::Join>(executor_index, output_schema, tp, using_expr_list);
auto join = std::make_shared<mock::Join>(executor_index, output_schema, tp, join_cols, left_conds, right_conds, other_conds, other_eq_conds_from_in);
join->children.push_back(left);
join->children.push_back(right);

Expand All @@ -1723,7 +1759,17 @@ ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr r
default:
throw Exception("Unsupported join type");
}
return compileJoin(executor_index, left, right, tp, ast_join.using_expression_list);

// in legacy test framework, we only support using_expr of join
ASTs join_cols;
if (ast_join.using_expression_list)
{
for (const auto & key : ast_join.using_expression_list->children)
{
join_cols.push_back(key);
}
}
return compileJoin(executor_index, left, right, tp, join_cols);
}

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type)
Expand Down
21 changes: 13 additions & 8 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,17 +255,22 @@ struct Join : Executor
{
tipb::JoinType tp;

const ASTPtr using_expr_list;
const ASTs join_cols{};
const ASTs left_conds{};
const ASTs right_conds{};
const ASTs other_conds{};
const ASTs other_eq_conds_from_in{};

// todo(ljr): support on expr
const ASTPtr on_expr{};

Join(size_t & index_, const DAGSchema & output_schema_, tipb::JoinType tp_, ASTPtr using_expr_list_)
Join(size_t & index_, const DAGSchema & output_schema_, tipb::JoinType tp_, const ASTs & join_cols_, const ASTs & l_conds, const ASTs & r_conds, const ASTs & o_conds, const ASTs & o_eq_conds)
: Executor(index_, "Join_" + std::to_string(index_), output_schema_)
, tp(tp_)
, using_expr_list(using_expr_list_)
, join_cols(join_cols_)
, left_conds(l_conds)
, right_conds(r_conds)
, other_conds(o_conds)
, other_eq_conds_from_in(o_eq_conds)
{
if (using_expr_list == nullptr)
if (!(join_cols.size() + left_conds.size() + right_conds.size() + other_conds.size() + other_eq_conds_from_in.size()))
throw Exception("No join condition found.");
}

Expand Down Expand Up @@ -359,7 +364,7 @@ ExecutorPtr compileProject(ExecutorPtr input, size_t & executor_index, ASTPtr se
/// avoid using ASTTableJoin.
ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, ASTPtr params);

ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, tipb::JoinType tp, ASTPtr using_expr_list);
ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, tipb::JoinType tp, const ASTs & join_cols, const ASTs & left_conds = {}, const ASTs & right_conds = {}, const ASTs & other_conds = {}, const ASTs & other_eq_conds_from_in = {});

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/gtest_collation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ try
{
/// Check collation for executors
auto request = context.scan(join_table, "t1")
.join(context.scan(join_table, "t2"), {col("a")}, tipb::JoinType::TypeInnerJoin)
.join(context.scan(join_table, "t2"), tipb::JoinType::TypeInnerJoin, {col("a")})
.aggregation({Max(col("a")), Min(col("a")), Count(col("a"))}, {col("b")})
.build(context);
ASSERT_EQ(checkExecutorCollation(request).size(), 0);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/gtest_compute_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ try
{
auto tasks = context
.scan("test_db", "l_table")
.join(context.scan("test_db", "r_table"), {col("join_c")}, tipb::JoinType::TypeLeftOuterJoin)
.join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")})
.topN("join_c", false, 2)
.buildMPPTasks(context);

Expand Down
48 changes: 24 additions & 24 deletions dbms/src/Flash/tests/gtest_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,12 @@ try
auto request = table1.join(
table2.join(
table3.join(table4,
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin)
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")})
.build(context);

String expected = R"(
Expand Down Expand Up @@ -445,12 +445,12 @@ CreatingSets
auto request = receiver1.join(
receiver2.join(
receiver3.join(receiver4,
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin)
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")})
.build(context);

String expected = R"(
Expand Down Expand Up @@ -487,12 +487,12 @@ CreatingSets
auto request = receiver1.join(
receiver2.join(
receiver3.join(receiver4,
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin),
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin)
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")}),
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")})
.exchangeSender(tipb::PassThrough)
.build(context);

Expand Down Expand Up @@ -533,8 +533,8 @@ try

auto request = table1.join(
table2,
{col("join_c")},
tipb::JoinType::TypeLeftOuterJoin)
tipb::JoinType::TypeLeftOuterJoin,
{col("join_c")})
.aggregation({Max(col("r_a"))}, {col("join_c")})
.build(context);
String expected = R"(
Expand Down Expand Up @@ -562,8 +562,8 @@ CreatingSets

auto request = table1.join(
table2,
{col("join_c")},
tipb::JoinType::TypeRightOuterJoin)
tipb::JoinType::TypeRightOuterJoin,
{col("join_c")})
.aggregation({Max(col("r_a"))}, {col("join_c")})
.build(context);
String expected = R"(
Expand Down Expand Up @@ -594,8 +594,8 @@ CreatingSets

auto request = receiver1.join(
receiver2,
{col("join_c")},
tipb::JoinType::TypeRightOuterJoin)
tipb::JoinType::TypeRightOuterJoin,
{col("join_c")})
.aggregation({Sum(col("r_a"))}, {col("join_c")})
.exchangeSender(tipb::PassThrough)
.limit(10)
Expand Down
Loading

0 comments on commit d8fae61

Please sign in to comment.