Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine get actions in DAGExpressionAnalyzer #282

Merged
merged 1 commit into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ struct ExecutorCtx
{
tipb::Executor * input;
DAGSchema output;
std::unordered_map<String, tipb::Expr *> col_ref_map;
std::unordered_map<String, std::vector<tipb::Expr *>> col_ref_map;
};

void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::unordered_set<String> & referred_columns,
std::unordered_map<String, tipb::Expr *> & col_ref_map)
std::unordered_map<String, std::vector<tipb::Expr *>> & col_ref_map)
{
if (ASTIdentifier * id = typeid_cast<ASTIdentifier *>(ast.get()))
{
Expand All @@ -159,7 +159,9 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
*(expr->mutable_field_type()) = columnInfoToFieldType((*ft).second);

referred_columns.emplace((*ft).first);
col_ref_map.emplace((*ft).first, expr);
if (col_ref_map.find((*ft).first) == col_ref_map.end())
col_ref_map[(*ft).first] = {};
col_ref_map[(*ft).first].push_back(expr);
}
else if (ASTFunction * func = typeid_cast<ASTFunction *>(ast.get()))
{
Expand Down Expand Up @@ -208,6 +210,20 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "less")
{
expr->set_sig(tipb::ScalarFuncSig::LTInt);
auto * ft = expr->mutable_field_type();
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "lessorequals")
{
expr->set_sig(tipb::ScalarFuncSig::LEInt);
auto * ft = expr->mutable_field_type();
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else
{
throw Exception("Unsupported function: " + func_name_lowercase, ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -259,7 +275,7 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
}

void compileFilter(const DAGSchema & input, ASTPtr ast, tipb::Selection * filter, std::unordered_set<String> & referred_columns,
std::unordered_map<String, tipb::Expr *> & col_ref_map)
std::unordered_map<String, std::vector<tipb::Expr *>> & col_ref_map)
{
if (auto * func = typeid_cast<ASTFunction *>(ast.get()))
{
Expand Down Expand Up @@ -339,7 +355,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
ci.tp = TiDB::TypeTimestamp;
ts_output.emplace_back(std::make_pair(column_info.name, std::move(ci)));
}
executor_ctx_map.emplace(ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map<String, tipb::Expr *>{}});
executor_ctx_map.emplace(ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map<String, std::vector<tipb::Expr *>>{}});
last_executor = ts_exec;
}

Expand All @@ -349,7 +365,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
tipb::Executor * filter_exec = dag_request.add_executors();
filter_exec->set_tp(tipb::ExecType::TypeSelection);
tipb::Selection * filter = filter_exec->mutable_selection();
std::unordered_map<String, tipb::Expr *> col_ref_map;
std::unordered_map<String, std::vector<tipb::Expr *>> col_ref_map;
compileFilter(executor_ctx_map[last_executor].output, ast_query.where_expression, filter, referred_columns, col_ref_map);
executor_ctx_map.emplace(filter_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::move(col_ref_map)});
last_executor = filter_exec;
Expand All @@ -361,7 +377,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
tipb::Executor * topn_exec = dag_request.add_executors();
topn_exec->set_tp(tipb::ExecType::TypeTopN);
tipb::TopN * topn = topn_exec->mutable_topn();
std::unordered_map<String, tipb::Expr *> col_ref_map;
std::unordered_map<String, std::vector<tipb::Expr *>> col_ref_map;
for (const auto & child : ast_query.order_expression_list->children)
{
ASTOrderByElement * elem = typeid_cast<ASTOrderByElement *>(child.get());
Expand All @@ -385,7 +401,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
auto limit_length = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*ast_query.limit_length).value);
limit->set_limit(limit_length);
executor_ctx_map.emplace(
limit_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::unordered_map<String, tipb::Expr *>{}});
limit_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::unordered_map<String, std::vector<tipb::Expr *>>{}});
last_executor = limit_exec;
}

Expand Down Expand Up @@ -425,7 +441,9 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
throw Exception("Column not found when pruning: " + pair.first, ErrorCodes::LOGICAL_ERROR);
std::stringstream ss;
encodeDAGInt64(iter - last_output.begin(), ss);
pair.second->set_val(ss.str());
auto s_val = ss.str();
for (auto * expr : pair.second)
expr->set_val(s_val);
}
executor_ctx.output = last_output;
};
Expand All @@ -452,7 +470,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
tipb::Executor * agg_exec = dag_request.add_executors();
agg_exec->set_tp(tipb::ExecType::TypeAggregation);
tipb::Aggregation * agg = agg_exec->mutable_aggregation();
std::unordered_map<String, tipb::Expr *> col_ref_map;
std::unordered_map<String, std::vector<tipb::Expr *>> col_ref_map;
for (const auto & expr : ast_query.select_expression_list->children)
{
const ASTFunction * func = typeid_cast<const ASTFunction *>(expr.get());
Expand Down
37 changes: 16 additions & 21 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ bool isUInt8Type(const DataTypePtr & type)

String DAGExpressionAnalyzer::applyFunction(const String & func_name, Names & arg_names, ExpressionActionsPtr & actions)
{
const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, context);
String result_name = genFuncString(func_name, arg_names);
if (actions->getSampleBlock().has(result_name))
return result_name;
const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, context);
const ExpressionAction & apply_function = ExpressionAction::applyFunction(function_builder, arg_names, result_name);
actions->add(apply_function);
return result_name;
Expand Down Expand Up @@ -377,11 +379,11 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
auto * type_field_type = type_expr.mutable_field_type();
type_field_type->set_tp(TiDB::TypeString);
type_field_type->set_flag(TiDB::ColumnFlagNotNull);
getActions(type_expr, actions);
auto type_expr_name = getActions(type_expr, actions);

Names cast_argument_names;
cast_argument_names.push_back(expr_name);
cast_argument_names.push_back(getName(type_expr, getCurrentInputColumns()));
cast_argument_names.push_back(type_expr_name);
String cast_expr_name = applyFunction("CAST", cast_argument_names, actions);
return cast_expr_name;
}
Expand Down Expand Up @@ -450,23 +452,22 @@ static String getUniqueName(const Block & block, const String & prefix)

String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions)
{
String expr_name = getName(expr, getCurrentInputColumns());
if ((isLiteralExpr(expr) || isFunctionExpr(expr)) && actions->getSampleBlock().has(expr_name))
{
return expr_name;
}
if (isLiteralExpr(expr))
{
Field value = decodeLiteral(expr);
DataTypePtr type = exprHasValidFieldType(expr) ? getDataTypeByFieldType(expr.field_type()) : applyVisitor(FieldToDataType(), value);
DataTypePtr flash_type = applyVisitor(FieldToDataType(), value);
DataTypePtr target_type = exprHasValidFieldType(expr) ? getDataTypeByFieldType(expr.field_type()) : flash_type;
String name = exprToString(expr, getCurrentInputColumns()) + "_" + target_type->getName();
if (actions->getSampleBlock().has(name))
return name;

ColumnWithTypeAndName column;
column.column = type->createColumnConst(1, convertFieldToType(value, *type));
column.name = expr_name;
column.type = type;
column.column = target_type->createColumnConst(1, convertFieldToType(value, *target_type, flash_type.get()));
column.name = name;
column.type = target_type;

actions->add(ExpressionAction::addColumn(column));
return column.name;
return name;
}
else if (isColumnExpr(expr))
{
Expand All @@ -476,7 +477,7 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
//todo check if the column type need to be cast to field type
return expr_name;
return getCurrentInputColumns()[column_id].name;
}
else if (isFunctionExpr(expr))
{
Expand Down Expand Up @@ -515,13 +516,7 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi
}
}

// need to re-construct expr_name, because expr_name generated previously is based on expr tree,
// but for function call, it's argument name may be changed as an implicit cast func maybe
// inserted(refer to the logic below), so we need to update the expr_name
// for example, for a expr and(arg1, arg2), the expr_name is and(arg1_name,arg2_name), but
// if the arg1 need to be casted to the type passed by dag request, then the expr_name
// should be updated to and(casted_arg1_name, arg2_name)
expr_name = applyFunction(func_name, argument_names, actions);
String expr_name = applyFunction(func_name, argument_names, actions);
// add cast if needed
expr_name = appendCastIfNeeded(expr, actions, expr_name);
return expr_name;
Expand Down
11 changes: 3 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const String & getFunctionName(const tipb::Expr & expr)
}
}

String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col, bool for_parser)
String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col)
{
std::stringstream ss;
Int64 column_id = 0;
Expand Down Expand Up @@ -123,7 +123,7 @@ String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair>
throw Exception(tipb::ExprType_Name(expr.tp()) + " not supported", ErrorCodes::UNSUPPORTED_METHOD);
}
// build function expr
if (isInOrGlobalInOperator(func_name) && for_parser)
if (isInOrGlobalInOperator(func_name))
{
// for in, we could not represent the function expr using func_name(param1, param2, ...)
throw Exception("Function " + func_name + " not supported", ErrorCodes::UNSUPPORTED_METHOD);
Expand All @@ -132,7 +132,7 @@ String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair>
bool first = true;
for (const tipb::Expr & child : expr.children())
{
String s = exprToString(child, input_col, for_parser);
String s = exprToString(child, input_col);
if (first)
{
first = false;
Expand All @@ -149,11 +149,6 @@ String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair>

const String & getTypeName(const tipb::Expr & expr) { return tipb::ExprType_Name(expr.tp()); }

String getName(const tipb::Expr & expr, const std::vector<NameAndTypePair> & current_input_columns)
{
return exprToString(expr, current_input_columns, false);
}

bool isAggFunctionExpr(const tipb::Expr & expr)
{
switch (expr.tp())
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ const String & getFunctionName(const tipb::Expr & expr);
const String & getAggFunctionName(const tipb::Expr & expr);
bool isColumnExpr(const tipb::Expr & expr);
ColumnID getColumnID(const tipb::Expr & expr);
String getName(const tipb::Expr & expr, const std::vector<NameAndTypePair> & current_input_columns);
const String & getTypeName(const tipb::Expr & expr);
String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col, bool for_parser = true);
String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col);
bool isInOrGlobalInOperator(const String & name);
bool exprHasValidFieldType(const tipb::Expr & expr);
extern std::unordered_map<tipb::ExprType, String> agg_func_map;
Expand Down
6 changes: 6 additions & 0 deletions tests/mutable-test/txn_dag/filter.test
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
│ 777 │ test2 │ 777 │
└───────┴───────┴───────┘

# DAG read, col used multiple times in the query
=> DBGInvoke dag('select * from default.test where col_2 < 777 or col_2 > 888')
┌─col_1─┬─col_2─┐
│ test1 │ 666 │
└───────┴───────┘

# Mock DAG read, where and.
=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_1 = \'test2\' and col_2 = 777', 4)
┌─col_2─┬─col_1─┬─col_2─┐
Expand Down