-
Notifications
You must be signed in to change notification settings - Fork 411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
throw exception when meet error duing cop request handling #162
Changes from 10 commits
71c09fb
6b8a054
6f32efd
11b3e09
64fef5c
f96fcf4
324b64d
6b06122
2327e9f
72d11ad
428459a
f3eb6e5
d8bb7d9
b6eaa3b
fe7916e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,51 +11,56 @@ | |
namespace DB | ||
{ | ||
|
||
bool DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss) | ||
namespace ErrorCodes | ||
{ | ||
TableID id; | ||
extern const int UNKNOWN_TABLE; | ||
extern const int COP_BAD_DAG_REQUEST; | ||
} // namespace ErrorCodes | ||
|
||
void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss) | ||
{ | ||
TableID table_id; | ||
if (ts.has_table_id()) | ||
{ | ||
id = ts.table_id(); | ||
table_id = ts.table_id(); | ||
} | ||
else | ||
{ | ||
// do not have table id | ||
return false; | ||
throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST); | ||
} | ||
auto & tmt_ctx = context.getTMTContext(); | ||
auto storage = tmt_ctx.getStorages().get(id); | ||
auto storage = tmt_ctx.getStorages().get(table_id); | ||
if (storage == nullptr) | ||
{ | ||
return false; | ||
throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE); | ||
} | ||
const auto * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get()); | ||
if (!merge_tree) | ||
{ | ||
return false; | ||
throw Exception("Only MergeTree table is supported in DAG request", ErrorCodes::COP_BAD_DAG_REQUEST); | ||
} | ||
|
||
if (ts.columns_size() == 0) | ||
{ | ||
// no column selected, must be something wrong | ||
return false; | ||
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST); | ||
} | ||
columns_from_ts = storage->getColumns().getAllPhysical(); | ||
for (const tipb::ColumnInfo & ci : ts.columns()) | ||
{ | ||
ColumnID cid = ci.column_id(); | ||
if (cid <= 0 || cid > (ColumnID)columns_from_ts.size()) | ||
{ | ||
throw Exception("column id out of bound"); | ||
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST); | ||
} | ||
String name = merge_tree->getTableInfo().columns[cid - 1].name; | ||
output_from_ts.push_back(std::move(name)); | ||
} | ||
ss << "FROM " << merge_tree->getTableInfo().db_name << "." << merge_tree->getTableInfo().name << " "; | ||
return true; | ||
} | ||
|
||
bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss) | ||
void DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss) | ||
{ | ||
bool first = true; | ||
for (const tipb::Expr & expr : sel.conditions()) | ||
|
@@ -72,35 +77,30 @@ bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::string | |
} | ||
ss << s << " "; | ||
} | ||
return true; | ||
} | ||
|
||
bool DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) | ||
{ | ||
ss << "LIMIT " << limit.limit() << " "; | ||
return true; | ||
} | ||
void DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) { ss << "LIMIT " << limit.limit() << " "; } | ||
|
||
//todo return the error message | ||
bool DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss) | ||
void DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss) | ||
{ | ||
switch (executor.tp()) | ||
{ | ||
case tipb::ExecType::TypeTableScan: | ||
return buildTSString(executor.tbl_scan(), ss); | ||
case tipb::ExecType::TypeIndexScan: | ||
// index scan not supported | ||
return false; | ||
throw Exception("IndexScan is not supported"); | ||
case tipb::ExecType::TypeSelection: | ||
return buildSelString(executor.selection(), ss); | ||
case tipb::ExecType::TypeAggregation: | ||
// stream agg is not supported, treated as normal agg | ||
case tipb::ExecType::TypeStreamAgg: | ||
//todo support agg | ||
return false; | ||
throw Exception("Aggregation is not supported"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
case tipb::ExecType::TypeTopN: | ||
// todo support top n | ||
return false; | ||
throw Exception("TopN is not supported"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
case tipb::ExecType::TypeLimit: | ||
return buildLimitString(executor.limit(), ss); | ||
} | ||
|
@@ -123,10 +123,7 @@ String DAGStringConverter::buildSqlString() | |
std::stringstream project; | ||
for (const tipb::Executor & executor : dag_request.executors()) | ||
{ | ||
if (!buildString(executor, query_buf)) | ||
{ | ||
return ""; | ||
} | ||
buildString(executor, query_buf); | ||
} | ||
if (!isProject(dag_request.executors(dag_request.executors_size() - 1))) | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,7 +69,7 @@ class InterpreterDAG : public IInterpreter | |
}; | ||
|
||
bool executeImpl(Pipeline & pipeline); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function returns bool? |
||
bool executeTS(const tipb::TableScan & ts, Pipeline & pipeline); | ||
void executeTS(const tipb::TableScan & ts, Pipeline & pipeline); | ||
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column); | ||
void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr); | ||
void executeOrder(Pipeline & pipeline, Strings & order_column_names); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error code?