Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throw exception when meet error duing cop request handling #162

Merged
merged 15 commits into from
Aug 7, 2019
Merged
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ namespace ErrorCodes
extern const int REGION_MISS = 10002;
extern const int SCHEMA_SYNC_ERROR = 10003;
extern const int SCHEMA_VERSION_ERROR = 10004;
extern const int COP_BAD_DAG_REQUEST = 10005;
}

}
24 changes: 13 additions & 11 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@

namespace DB
{

namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

static String genCastString(const String & org_name, const String & target_type_name)
{
return "cast(" + org_name + ", " + target_type_name + ") ";
Expand Down Expand Up @@ -44,13 +50,13 @@ DAGExpressionAnalyzer::DAGExpressionAnalyzer(const NamesAndTypesList & source_co
after_agg = false;
}

bool DAGExpressionAnalyzer::appendAggregation(
void DAGExpressionAnalyzer::appendAggregation(
ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregation_keys, AggregateDescriptions & aggregate_descriptions)
{
if (agg.group_by_size() == 0 && agg.agg_func_size() == 0)
{
//should not reach here
return false;
throw Exception("Aggregation executor without group by/agg exprs", ErrorCodes::COP_BAD_DAG_REQUEST);
}
initChain(chain, getCurrentInputColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
Expand Down Expand Up @@ -94,14 +100,13 @@ bool DAGExpressionAnalyzer::appendAggregation(
aggregation_keys.push_back(name);
}
after_agg = true;
return true;
}

bool DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name)
void DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name)
{
if (sel.conditions_size() == 0)
{
return false;
throw Exception("Selection executor without condition exprs", ErrorCodes::COP_BAD_DAG_REQUEST);
}
tipb::Expr final_condition;
if (sel.conditions_size() > 1)
Expand All @@ -120,14 +125,13 @@ bool DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const ti
initChain(chain, getCurrentInputColumns());
filter_column_name = getActions(filter, chain.steps.back().actions);
chain.steps.back().required_output.push_back(filter_column_name);
return true;
}

bool DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names)
void DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names)
{
if (topN.order_by_size() == 0)
{
return false;
throw Exception("TopN executor without order by exprs", ErrorCodes::COP_BAD_DAG_REQUEST);
}
initChain(chain, getCurrentInputColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
Expand All @@ -137,12 +141,11 @@ bool DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const
step.required_output.push_back(name);
order_column_names.push_back(name);
}
return true;
}

const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return after_agg ? aggregated_columns : source_columns; }

bool DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation)
void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation)
{
initChain(chain, getCurrentInputColumns());
bool need_update_aggregated_columns = false;
Expand Down Expand Up @@ -191,7 +194,6 @@ bool DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
aggregated_columns.emplace_back(updated_aggregated_columns.getNames()[i], updated_aggregated_columns.getTypes()[i]);
}
}
return true;
}

String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String expr_name)
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class DAGExpressionAnalyzer : private boost::noncopyable

public:
DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_);
bool appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name);
bool appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names);
bool appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys,
void appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name);
void appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names);
void appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys,
AggregateDescriptions & aggregate_descriptions);
bool appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg);
void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg);
String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String expr_name);
void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const
{
Expand Down
50 changes: 23 additions & 27 deletions dbms/src/Interpreters/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,51 +11,57 @@
namespace DB
{

bool DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss)
namespace ErrorCodes
{
TableID id;
extern const int UNKNOWN_TABLE;
extern const int COP_BAD_DAG_REQUEST;
extern const int NOT_IMPLEMENTED;
} // namespace ErrorCodes

void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss)
{
TableID table_id;
if (ts.has_table_id())
{
id = ts.table_id();
table_id = ts.table_id();
}
else
{
// do not have table id
return false;
throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
auto & tmt_ctx = context.getTMTContext();
auto storage = tmt_ctx.getStorages().get(id);
auto storage = tmt_ctx.getStorages().get(table_id);
if (storage == nullptr)
{
return false;
throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
}
const auto * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get());
if (!merge_tree)
{
return false;
throw Exception("Only MergeTree table is supported in DAG request", ErrorCodes::COP_BAD_DAG_REQUEST);
}

if (ts.columns_size() == 0)
{
// no column selected, must be something wrong
return false;
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
columns_from_ts = storage->getColumns().getAllPhysical();
for (const tipb::ColumnInfo & ci : ts.columns())
{
ColumnID cid = ci.column_id();
if (cid <= 0 || cid > (ColumnID)columns_from_ts.size())
{
throw Exception("column id out of bound");
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
String name = merge_tree->getTableInfo().columns[cid - 1].name;
output_from_ts.push_back(std::move(name));
}
ss << "FROM " << merge_tree->getTableInfo().db_name << "." << merge_tree->getTableInfo().name << " ";
return true;
}

bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss)
void DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss)
{
bool first = true;
for (const tipb::Expr & expr : sel.conditions())
Expand All @@ -72,40 +78,33 @@ bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::string
}
ss << s << " ";
}
return true;
}

bool DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss)
{
ss << "LIMIT " << limit.limit() << " ";
return true;
}
void DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) { ss << "LIMIT " << limit.limit() << " "; }

//todo return the error message
bool DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss)
void DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss)
{
switch (executor.tp())
{
case tipb::ExecType::TypeTableScan:
return buildTSString(executor.tbl_scan(), ss);
case tipb::ExecType::TypeIndexScan:
// index scan not supported
return false;
throw Exception("IndexScan is not supported", ErrorCodes::NOT_IMPLEMENTED);
case tipb::ExecType::TypeSelection:
return buildSelString(executor.selection(), ss);
case tipb::ExecType::TypeAggregation:
// stream agg is not supported, treated as normal agg
case tipb::ExecType::TypeStreamAgg:
//todo support agg
return false;
throw Exception("Aggregation is not supported", ErrorCodes::NOT_IMPLEMENTED);
case tipb::ExecType::TypeTopN:
// todo support top n
return false;
throw Exception("TopN is not supported", ErrorCodes::NOT_IMPLEMENTED);
case tipb::ExecType::TypeLimit:
return buildLimitString(executor.limit(), ss);
}

return false;
}

bool isProject(const tipb::Executor &)
Expand All @@ -125,10 +124,7 @@ String DAGStringConverter::buildSqlString()
std::stringstream project;
for (const tipb::Executor & executor : dag_request.executors())
{
if (!buildString(executor, query_buf))
{
return "";
}
buildString(executor, query_buf);
}
if (!isProject(dag_request.executors(dag_request.executors_size() - 1)))
{
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/DAGStringConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class DAGStringConverter
}

protected:
bool buildTSString(const tipb::TableScan & ts, std::stringstream & ss);
bool buildSelString(const tipb::Selection & sel, std::stringstream & ss);
bool buildLimitString(const tipb::Limit & limit, std::stringstream & ss);
bool buildString(const tipb::Executor & executor, std::stringstream & ss);
void buildTSString(const tipb::TableScan & ts, std::stringstream & ss);
void buildSelString(const tipb::Selection & sel, std::stringstream & ss);
void buildLimitString(const tipb::Limit & limit, std::stringstream & ss);
void buildString(const tipb::Executor & executor, std::stringstream & ss);

protected:
Context & context;
Expand Down
44 changes: 21 additions & 23 deletions dbms/src/Interpreters/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/Types.h>
Expand All @@ -30,19 +31,20 @@ extern const int UNKNOWN_TABLE;
extern const int TOO_MANY_COLUMNS;
extern const int SCHEMA_VERSION_ERROR;
extern const int UNKNOWN_EXCEPTION;
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
: context(context_), dag(dag_), log(&Logger::get("InterpreterDAG"))
{}

// the flow is the same as executeFetchcolumns
bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
{
if (!ts.has_table_id())
{
// do not have table id
return false;
throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
TableID table_id = ts.table_id();
// TODO: Get schema version from DAG request.
Expand All @@ -67,15 +69,15 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
if (cid < 1 || cid > (Int64)storage->getTableInfo().columns.size())
{
// cid out of bound
return false;
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
String name = storage->getTableInfo().columns[cid - 1].name;
required_columns.push_back(name);
}
if (required_columns.empty())
{
// no column selected, must be something wrong
return false;
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}

if (!dag.hasAggregation())
Expand All @@ -87,7 +89,7 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
if (i >= required_columns.size())
{
// array index out of bound
return false;
throw Exception("Output offset index is out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
// do not have alias
final_project.emplace_back(required_columns[i], "");
Expand Down Expand Up @@ -125,7 +127,10 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
auto current_region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, info.region_id);
if (!current_region)
{
return false;
//todo add more region error info in RegionException
std::vector<RegionID> region_ids;
region_ids.push_back(info.region_id);
throw RegionException(region_ids);
}
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
Expand Down Expand Up @@ -164,7 +169,6 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
ColumnsWithTypeAndName columnsWithTypeAndName = pipeline.firstStream()->getHeader().getColumnsWithTypeAndName();
source_columns = storage->getColumns().getAllPhysical();
return true;
}

InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
Expand All @@ -175,17 +179,15 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
DAGExpressionAnalyzer analyzer(source_columns, context);
if (dag.hasSelection())
{
if (analyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name))
{
res.has_where = true;
res.before_where = chain.getLastActions();
res.filter_column_name = chain.steps.back().required_output[0];
chain.addStep();
}
analyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name);
res.has_where = true;
res.before_where = chain.getLastActions();
res.filter_column_name = chain.steps.back().required_output[0];
chain.addStep();
}
if (res.need_aggregate)
{
res.need_aggregate = analyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions);
analyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions);
res.before_aggregation = chain.getLastActions();

chain.finalize();
Expand All @@ -201,7 +203,8 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
}
if (dag.hasTopN())
{
res.has_order_by = analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names);
res.has_order_by = true;
analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names);
}
// append final project results
for (auto & name : final_project)
Expand Down Expand Up @@ -423,13 +426,9 @@ void InterpreterDAG::executeOrder(Pipeline & pipeline, Strings & order_column_na
limit, settings.max_bytes_before_external_sort, context.getTemporaryPath());
}

//todo return the error message
bool InterpreterDAG::executeImpl(Pipeline & pipeline)
void InterpreterDAG::executeImpl(Pipeline & pipeline)
{
if (!executeTS(dag.getTS(), pipeline))
{
return false;
}
executeTS(dag.getTS(), pipeline);

auto res = analyzeExpressions();
// execute selection
Expand Down Expand Up @@ -458,7 +457,6 @@ bool InterpreterDAG::executeImpl(Pipeline & pipeline)
{
executeLimit(pipeline);
}
return true;
}

void InterpreterDAG::executeFinalProject(Pipeline & pipeline)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class InterpreterDAG : public IInterpreter
AggregateDescriptions aggregate_descriptions;
};

bool executeImpl(Pipeline & pipeline);
bool executeTS(const tipb::TableScan & ts, Pipeline & pipeline);
void executeImpl(Pipeline & pipeline);
void executeTS(const tipb::TableScan & ts, Pipeline & pipeline);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column);
void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr);
void executeOrder(Pipeline & pipeline, Strings & order_column_names);
Expand Down