Skip to content

Commit

Permalink
Fix date/datetime/bit encode error (#200)
Browse files Browse the repository at this point in the history
* Enhance dbg invoke and add dag as schemaful function

* Add basic sql parse to dag

* Column id starts from 1

* Fix value to ref

* Add basic dag test

* Fix dag bugs and pass 1st mock test

* Make dag go normal routine and add mock dag

* Add todo

* Add comment

* Fix gcc compile error

* Enhance dag test

* Address comments

* Enhance mock sql -> dag compiler and add project test

* Mock sql dag compiler support more expression types and add filter test

* Add topn and limit test

* Add agg for sql -> dag parser and agg test

* Add dag specific codec

* type

* Update codec accordingly

* Remove cop-test

* Pass tests after merging master

* Copy some changes from xufei

* Enable date/datetime test

* Enable date/datetime test

* Refine code

* Adjust date/datetime tiflash rep to UInt

* Fix datetime to Int

* Typo
  • Loading branch information
zanmato1984 authored Aug 26, 2019
1 parent 08bacd7 commit e8b4198
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 55 deletions.
41 changes: 23 additions & 18 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Datum.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/TMTContext.h>
Expand All @@ -29,9 +30,12 @@ extern const int BAD_ARGUMENTS;
extern const int LOGICA_ERROR;
} // namespace ErrorCodes

using TiDB::DatumFlat;
using TiDB::TableInfo;

using DAGColumnInfo = std::pair<String, ColumnInfo>;
using DAGSchema = std::vector<DAGColumnInfo>;
using SchemaFetcher = std::function<TiDB::TableInfo(const String &, const String &)>;
using SchemaFetcher = std::function<TableInfo(const String &, const String &)>;
std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts);
tipb::SelectResponse executeDAGRequest(
Expand Down Expand Up @@ -138,7 +142,7 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
{
auto ft = std::find_if(input.begin(), input.end(), [&](const auto & field) { return field.first == id->getColumnName(); });
if (ft == input.end())
throw DB::Exception("No such column " + id->getColumnName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
throw Exception("No such column " + id->getColumnName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
expr->set_tp(tipb::ColumnRef);
*(expr->mutable_field_type()) = columnInfoToFieldType((*ft).second);

Expand Down Expand Up @@ -183,7 +187,7 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
}
else
{
throw DB::Exception("Unsupported function: " + func_name_lowercase, ErrorCodes::LOGICAL_ERROR);
throw Exception("Unsupported function: " + func_name_lowercase, ErrorCodes::LOGICAL_ERROR);
}
expr->set_tp(tipb::ExprType::ScalarFunc);
}
Expand Down Expand Up @@ -221,13 +225,13 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
encodeDAGBytes(lit->value.get<String>(), ss);
break;
default:
throw DB::Exception(String("Unsupported literal type: ") + lit->value.getTypeName(), ErrorCodes::LOGICAL_ERROR);
throw Exception(String("Unsupported literal type: ") + lit->value.getTypeName(), ErrorCodes::LOGICAL_ERROR);
}
expr->set_val(ss.str());
}
else
{
throw DB::Exception("Unsupported expression " + ast->getColumnName(), ErrorCodes::LOGICAL_ERROR);
throw Exception("Unsupported expression " + ast->getColumnName(), ErrorCodes::LOGICAL_ERROR);
}
}

Expand Down Expand Up @@ -262,7 +266,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
ASTSelectQuery & ast_query = typeid_cast<ASTSelectQuery &>(*ast);

/// Get table metadata.
TiDB::TableInfo table_info;
TableInfo table_info;
{
String database_name, table_name;
auto query_database = ast_query.database();
Expand Down Expand Up @@ -333,7 +337,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
{
ASTOrderByElement * elem = typeid_cast<ASTOrderByElement *>(child.get());
if (!elem)
throw DB::Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
throw Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
tipb::ByItem * by = topn->add_order_by();
by->set_desc(elem->direction < 0);
tipb::Expr * expr = by->mutable_expr();
Expand Down Expand Up @@ -389,7 +393,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
{
auto iter = std::find_if(last_output.begin(), last_output.end(), [&](const auto & field) { return field.first == pair.first; });
if (iter == last_output.end())
throw DB::Exception("Column not found when pruning: " + pair.first, ErrorCodes::LOGICAL_ERROR);
throw Exception("Column not found when pruning: " + pair.first, ErrorCodes::LOGICAL_ERROR);
std::stringstream ss;
encodeDAGInt64(iter - last_output.begin(), ss);
pair.second->set_val(ss.str());
Expand All @@ -414,7 +418,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
if (has_gby || has_agg_func)
{
if (last_executor->has_limit() || last_executor->has_topn())
throw DB::Exception("Limit/TopN and Agg cannot co-exist.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Limit/TopN and Agg cannot co-exist.", ErrorCodes::LOGICAL_ERROR);

tipb::Executor * agg_exec = dag_request.add_executors();
agg_exec->set_tp(tipb::ExecType::TypeAggregation);
Expand All @@ -424,7 +428,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
{
const ASTFunction * func = typeid_cast<const ASTFunction *>(expr.get());
if (!func || !AggregateFunctionFactory::instance().isAggregateFunctionName(func->name))
throw DB::Exception("Only agg function is allowed in select for a query with aggregation", ErrorCodes::LOGICAL_ERROR);
throw Exception("Only agg function is allowed in select for a query with aggregation", ErrorCodes::LOGICAL_ERROR);

tipb::Expr * agg_func = agg->add_agg_func();

Expand All @@ -444,7 +448,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
// TODO: Other agg func.
else
{
throw DB::Exception("Unsupported agg function " + func->name, ErrorCodes::LOGICAL_ERROR);
throw Exception("Unsupported agg function " + func->name, ErrorCodes::LOGICAL_ERROR);
}

schema.emplace_back(std::make_pair(func->getColumnName(), fieldTypeToColumnInfo(agg_func->field_type())));
Expand Down Expand Up @@ -489,7 +493,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
}
else
{
throw DB::Exception("Unsupported expression type in select", ErrorCodes::LOGICAL_ERROR);
throw Exception("Unsupported expression type in select", ErrorCodes::LOGICAL_ERROR);
}
}

Expand All @@ -501,7 +505,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
auto iter
= std::find_if(last_output.begin(), last_output.end(), [&](const auto & last_field) { return last_field.first == field; });
if (iter == last_output.end())
throw DB::Exception("Column not found after pruning: " + field, ErrorCodes::LOGICAL_ERROR);
throw Exception("Column not found after pruning: " + field, ErrorCodes::LOGICAL_ERROR);
dag_request.add_output_offsets(iter - last_output.begin());
schema.push_back(*iter);
}
Expand All @@ -526,18 +530,18 @@ tipb::SelectResponse executeDAGRequest(
BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response)
{
if (dag_response.has_error())
throw DB::Exception(dag_response.error().msg(), dag_response.error().code());
throw Exception(dag_response.error().msg(), dag_response.error().code());

BlocksList blocks;
for (const auto & chunk : dag_response.chunks())
{
std::vector<std::vector<DB::Field>> rows;
std::vector<DB::Field> curr_row;
std::vector<std::vector<Field>> rows;
std::vector<Field> curr_row;
const std::string & data = chunk.rows_data();
size_t cursor = 0;
while (cursor < data.size())
{
curr_row.push_back(DB::DecodeDatum(cursor, data));
curr_row.push_back(DecodeDatum(cursor, data));
if (curr_row.size() == schema.size())
{
rows.emplace_back(std::move(curr_row));
Expand All @@ -558,7 +562,8 @@ BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const
{
for (size_t i = 0; i < row.size(); i++)
{
columns[i].column->assumeMutable()->insert(row[i]);
const Field & field = row[i];
columns[i].column->assumeMutable()->insert(DatumFlat(field, schema[i].second.tp).field());
}
}

Expand Down
30 changes: 23 additions & 7 deletions dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,24 @@ T convertNumber(const Field & field)
}
}

Field convertDecimal(UInt32 scale, const Field & field)
Field convertDecimal(const ColumnInfo & column_info, const Field & field)
{
switch (field.getType())
{
case Field::Types::Int64:
return DecimalField(ToDecimal<Int64, Decimal64>(field.get<Int64>(), scale), scale);
return column_info.getDecimalValue(std::to_string(field.get<Int64>()));
case Field::Types::UInt64:
return DecimalField(ToDecimal<Int64, Decimal64>(field.get<UInt64>(), scale), scale);
return column_info.getDecimalValue(std::to_string(field.get<UInt64>()));
case Field::Types::Float64:
return DecimalField(ToDecimal<Float64, Decimal64>(field.get<Float64>(), scale), scale);
return column_info.getDecimalValue(std::to_string(field.get<Float64>()));
case Field::Types::Decimal32:
return column_info.getDecimalValue(field.get<Decimal32>().toString(column_info.decimal));
case Field::Types::Decimal64:
return column_info.getDecimalValue(field.get<Decimal64>().toString(column_info.decimal));
case Field::Types::Decimal128:
return column_info.getDecimalValue(field.get<Decimal128>().toString(column_info.decimal));
case Field::Types::Decimal256:
return field;
return column_info.getDecimalValue(field.get<Decimal256>().toString(column_info.decimal));
default:
throw Exception(String("Unable to convert field type ") + field.getTypeName() + " to number", ErrorCodes::LOGICAL_ERROR);
}
Expand Down Expand Up @@ -204,9 +207,22 @@ Field convertField(const ColumnInfo & column_info, const Field & field)
case TiDB::TypeDouble:
return convertNumber<Float64>(field);
case TiDB::TypeDate:
{
auto text = field.get<String>();
ReadBufferFromMemory buf(text.data(), text.size());
DayNum_t date;
readDateText(date, buf);
return static_cast<UInt64>(date);
}
case TiDB::TypeDatetime:
case TiDB::TypeTimestamp:
return DB::parseMyDatetime(field.get<String>());
{
auto text = field.get<String>();
ReadBufferFromMemory buf(text.data(), text.size());
time_t dt;
readDateTimeText(dt, buf);
return static_cast<Int64>(dt);
}
case TiDB::TypeVarchar:
case TiDB::TypeTinyBlob:
case TiDB::TypeMediumBlob:
Expand All @@ -221,7 +237,7 @@ Field convertField(const ColumnInfo & column_info, const Field & field)
return Field();
case TiDB::TypeDecimal:
case TiDB::TypeNewDecimal:
return convertDecimal(column_info.decimal, field);
return convertDecimal(column_info, field);
case TiDB::TypeTime:
throw Exception(String("Unable to convert field type ") + field.getTypeName() + " to Time", ErrorCodes::LOGICAL_ERROR);
case TiDB::TypeYear:
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
type_expr.set_tp(tipb::ExprType::String);
std::stringstream ss;
type_expr.set_val(expected_type->getName());
auto type_field_type = type_expr.field_type();
type_field_type.set_tp(0xfe);
type_field_type.set_flag(1);
auto * type_field_type = type_expr.mutable_field_type();
type_field_type->set_tp(0xfe);
type_field_type->set_flag(1);
getActions(type_expr, actions);

Names cast_argument_names;
Expand Down
22 changes: 21 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/Datum.h>
#include <Storages/Transaction/TiDB.h>

#include <unordered_map>
Expand Down Expand Up @@ -84,6 +85,16 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
else
throw Exception("Not decimal literal" + expr.DebugString(), ErrorCodes::COP_BAD_DAG_REQUEST);
}
case tipb::ExprType::MysqlTime:
{
if (!expr.has_field_type()
|| (expr.field_type().tp() != TiDB::TypeDate && expr.field_type().tp() != TiDB::TypeDatetime
&& expr.field_type().tp() != TiDB::TypeTimestamp))
throw Exception("Invalid MySQL Time literal " + expr.DebugString(), ErrorCodes::COP_BAD_DAG_REQUEST);
auto t = decodeDAGUInt64(expr.val());
// TODO: Use timezone in DAG request.
return std::to_string(TiDB::DatumFlat(t, static_cast<TiDB::TP>(expr.field_type().tp())).field().get<Int64>());
}
case tipb::ExprType::ColumnRef:
column_id = decodeDAGInt64(expr.val());
if (column_id < 0 || column_id >= (ColumnID)input_col.size())
Expand Down Expand Up @@ -222,12 +233,21 @@ Field decodeLiteral(const tipb::Expr & expr)
return decodeDAGBytes(expr.val());
case tipb::ExprType::MysqlDecimal:
return decodeDAGDecimal(expr.val());
case tipb::ExprType::MysqlTime:
{
if (!expr.has_field_type()
|| (expr.field_type().tp() != TiDB::TypeDate && expr.field_type().tp() != TiDB::TypeDatetime
&& expr.field_type().tp() != TiDB::TypeTimestamp))
throw Exception("Invalid MySQL Time literal " + expr.DebugString(), ErrorCodes::COP_BAD_DAG_REQUEST);
auto t = decodeDAGUInt64(expr.val());
// TODO: Use timezone in DAG request.
return TiDB::DatumFlat(t, static_cast<TiDB::TP>(expr.field_type().tp())).field();
}
case tipb::ExprType::MysqlBit:
case tipb::ExprType::MysqlDuration:
case tipb::ExprType::MysqlEnum:
case tipb::ExprType::MysqlHex:
case tipb::ExprType::MysqlSet:
case tipb::ExprType::MysqlTime:
case tipb::ExprType::MysqlJson:
case tipb::ExprType::ValueList:
throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported yet", ErrorCodes::UNSUPPORTED_METHOD);
Expand Down
Loading

0 comments on commit e8b4198

Please sign in to comment.