Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix date/datetime/bit encode error #200

Merged
merged 40 commits into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
309ab6b
Enhance dbg invoke and add dag as schemaful function
zanmato1984 Aug 7, 2019
31d83c4
Add basic sql parse to dag
zanmato1984 Aug 8, 2019
3c4c508
Merge cop
zanmato1984 Aug 8, 2019
2de7311
Merge branch 'cop' into cop-ruoxi
zanmato1984 Aug 8, 2019
63a5800
Column id starts from 1
zanmato1984 Aug 8, 2019
232e7de
Fix value to ref
zanmato1984 Aug 8, 2019
1b14a12
Add basic dag test
zanmato1984 Aug 8, 2019
25eb831
Fix dag bugs and pass 1st mock test
zanmato1984 Aug 8, 2019
80f9fc6
Make dag go normal routine and add mock dag
zanmato1984 Aug 8, 2019
a1173e1
Add todo
zanmato1984 Aug 8, 2019
c8109f6
Add comment
zanmato1984 Aug 8, 2019
7dc0397
Fix gcc compile error
zanmato1984 Aug 8, 2019
66d9e8a
Enhance dag test
zanmato1984 Aug 8, 2019
36d1117
Address comments
zanmato1984 Aug 9, 2019
8aea5aa
Merge branch 'cop' into cop-ruoxi
zanmato1984 Aug 9, 2019
f62b318
Merge branch 'cop' into cop-ruoxi
zanmato1984 Aug 12, 2019
a9fe9f9
Enhance mock sql -> dag compiler and add project test
zanmato1984 Aug 12, 2019
1372262
Mock sql dag compiler support more expression types and add filter test
zanmato1984 Aug 13, 2019
e2f9a02
Add topn and limit test
zanmato1984 Aug 13, 2019
8cea243
Add agg for sql -> dag parser and agg test
zanmato1984 Aug 13, 2019
5008a7a
Merge branch 'cop' of github.com:pingcap/tics into cop-ruoxi
zanmato1984 Aug 14, 2019
8fb4d52
Add dag specific codec
zanmato1984 Aug 15, 2019
c77310e
Merge branch 'cop' of github.com:pingcap/tics into cop-ruoxi
zanmato1984 Aug 15, 2019
0c8e3a5
type
zanmato1984 Aug 15, 2019
76b5444
Merge branch 'cop' of github.com:pingcap/tics into cop-ruoxi
zanmato1984 Aug 15, 2019
41d2b4f
Update codec accordingly
zanmato1984 Aug 15, 2019
17111f5
Remove cop-test
zanmato1984 Aug 15, 2019
f3adf8e
Merge cop
zanmato1984 Aug 19, 2019
31684fa
Merge branch 'cop' of github.com:pingcap/tics into cop-ruoxi
zanmato1984 Aug 20, 2019
7a74ffb
Merge branch 'cop' of github.com:pingcap/tics into cop-ruoxi
zanmato1984 Aug 23, 2019
0c29e49
Merge branch 'cop' of github.com:pingcap/tics into cop-ruoxi
zanmato1984 Aug 24, 2019
d6eefa7
Pass tests after merging master
zanmato1984 Aug 24, 2019
916f5bb
Merge branch 'cop' of github.com:pingcap/tics into cop-ruoxi
zanmato1984 Aug 25, 2019
50873d6
Copy some changes from xufei
zanmato1984 Aug 25, 2019
96de82e
Enable date/datetime test
zanmato1984 Aug 25, 2019
ac37a78
Enable date/datetime test
zanmato1984 Aug 25, 2019
2153142
Refine code
zanmato1984 Aug 25, 2019
1485066
Adjust date/datetime tiflash rep to UInt
zanmato1984 Aug 26, 2019
d01d0b6
Fix datetime to Int
zanmato1984 Aug 26, 2019
68b4955
Typo
zanmato1984 Aug 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Datum.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/TMTContext.h>
Expand All @@ -29,9 +30,12 @@ extern const int BAD_ARGUMENTS;
extern const int LOGICA_ERROR;
} // namespace ErrorCodes

using TiDB::DatumFlat;
using TiDB::TableInfo;

using DAGColumnInfo = std::pair<String, ColumnInfo>;
using DAGSchema = std::vector<DAGColumnInfo>;
using SchemaFetcher = std::function<TiDB::TableInfo(const String &, const String &)>;
using SchemaFetcher = std::function<TableInfo(const String &, const String &)>;
std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts);
tipb::SelectResponse executeDAGRequest(
Expand Down Expand Up @@ -138,7 +142,7 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
{
auto ft = std::find_if(input.begin(), input.end(), [&](const auto & field) { return field.first == id->getColumnName(); });
if (ft == input.end())
throw DB::Exception("No such column " + id->getColumnName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
throw Exception("No such column " + id->getColumnName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
expr->set_tp(tipb::ColumnRef);
*(expr->mutable_field_type()) = columnInfoToFieldType((*ft).second);

Expand Down Expand Up @@ -183,7 +187,7 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
}
else
{
throw DB::Exception("Unsupported function: " + func_name_lowercase, ErrorCodes::LOGICAL_ERROR);
throw Exception("Unsupported function: " + func_name_lowercase, ErrorCodes::LOGICAL_ERROR);
}
expr->set_tp(tipb::ExprType::ScalarFunc);
}
Expand Down Expand Up @@ -221,13 +225,13 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
encodeDAGBytes(lit->value.get<String>(), ss);
break;
default:
throw DB::Exception(String("Unsupported literal type: ") + lit->value.getTypeName(), ErrorCodes::LOGICAL_ERROR);
throw Exception(String("Unsupported literal type: ") + lit->value.getTypeName(), ErrorCodes::LOGICAL_ERROR);
}
expr->set_val(ss.str());
}
else
{
throw DB::Exception("Unsupported expression " + ast->getColumnName(), ErrorCodes::LOGICAL_ERROR);
throw Exception("Unsupported expression " + ast->getColumnName(), ErrorCodes::LOGICAL_ERROR);
}
}

Expand Down Expand Up @@ -262,7 +266,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
ASTSelectQuery & ast_query = typeid_cast<ASTSelectQuery &>(*ast);

/// Get table metadata.
TiDB::TableInfo table_info;
TableInfo table_info;
{
String database_name, table_name;
auto query_database = ast_query.database();
Expand Down Expand Up @@ -333,7 +337,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
{
ASTOrderByElement * elem = typeid_cast<ASTOrderByElement *>(child.get());
if (!elem)
throw DB::Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
throw Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
tipb::ByItem * by = topn->add_order_by();
by->set_desc(elem->direction < 0);
tipb::Expr * expr = by->mutable_expr();
Expand Down Expand Up @@ -389,7 +393,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
{
auto iter = std::find_if(last_output.begin(), last_output.end(), [&](const auto & field) { return field.first == pair.first; });
if (iter == last_output.end())
throw DB::Exception("Column not found when pruning: " + pair.first, ErrorCodes::LOGICAL_ERROR);
throw Exception("Column not found when pruning: " + pair.first, ErrorCodes::LOGICAL_ERROR);
std::stringstream ss;
encodeDAGInt64(iter - last_output.begin(), ss);
pair.second->set_val(ss.str());
Expand All @@ -414,7 +418,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
if (has_gby || has_agg_func)
{
if (last_executor->has_limit() || last_executor->has_topn())
throw DB::Exception("Limit/TopN and Agg cannot co-exist.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Limit/TopN and Agg cannot co-exist.", ErrorCodes::LOGICAL_ERROR);

tipb::Executor * agg_exec = dag_request.add_executors();
agg_exec->set_tp(tipb::ExecType::TypeAggregation);
Expand All @@ -424,7 +428,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
{
const ASTFunction * func = typeid_cast<const ASTFunction *>(expr.get());
if (!func || !AggregateFunctionFactory::instance().isAggregateFunctionName(func->name))
throw DB::Exception("Only agg function is allowed in select for a query with aggregation", ErrorCodes::LOGICAL_ERROR);
throw Exception("Only agg function is allowed in select for a query with aggregation", ErrorCodes::LOGICAL_ERROR);

tipb::Expr * agg_func = agg->add_agg_func();

Expand All @@ -444,7 +448,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
// TODO: Other agg func.
else
{
throw DB::Exception("Unsupported agg function " + func->name, ErrorCodes::LOGICAL_ERROR);
throw Exception("Unsupported agg function " + func->name, ErrorCodes::LOGICAL_ERROR);
}

schema.emplace_back(std::make_pair(func->getColumnName(), fieldTypeToColumnInfo(agg_func->field_type())));
Expand Down Expand Up @@ -489,7 +493,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
}
else
{
throw DB::Exception("Unsupported expression type in select", ErrorCodes::LOGICAL_ERROR);
throw Exception("Unsupported expression type in select", ErrorCodes::LOGICAL_ERROR);
}
}

Expand All @@ -501,7 +505,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
auto iter
= std::find_if(last_output.begin(), last_output.end(), [&](const auto & last_field) { return last_field.first == field; });
if (iter == last_output.end())
throw DB::Exception("Column not found after pruning: " + field, ErrorCodes::LOGICAL_ERROR);
throw Exception("Column not found after pruning: " + field, ErrorCodes::LOGICAL_ERROR);
dag_request.add_output_offsets(iter - last_output.begin());
schema.push_back(*iter);
}
Expand All @@ -526,18 +530,18 @@ tipb::SelectResponse executeDAGRequest(
BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response)
{
if (dag_response.has_error())
throw DB::Exception(dag_response.error().msg(), dag_response.error().code());
throw Exception(dag_response.error().msg(), dag_response.error().code());

BlocksList blocks;
for (const auto & chunk : dag_response.chunks())
{
std::vector<std::vector<DB::Field>> rows;
std::vector<DB::Field> curr_row;
std::vector<std::vector<Field>> rows;
std::vector<Field> curr_row;
const std::string & data = chunk.rows_data();
size_t cursor = 0;
while (cursor < data.size())
{
curr_row.push_back(DB::DecodeDatum(cursor, data));
curr_row.push_back(DecodeDatum(cursor, data));
if (curr_row.size() == schema.size())
{
rows.emplace_back(std::move(curr_row));
Expand All @@ -558,7 +562,8 @@ BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const
{
for (size_t i = 0; i < row.size(); i++)
{
columns[i].column->assumeMutable()->insert(row[i]);
const Field & field = row[i];
columns[i].column->assumeMutable()->insert(DatumFlat(field, schema[i].second.tp).field());
}
}

Expand Down
30 changes: 23 additions & 7 deletions dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,24 @@ T convertNumber(const Field & field)
}
}

Field convertDecimal(UInt32 scale, const Field & field)
Field convertDecimal(const ColumnInfo & column_info, const Field & field)
{
switch (field.getType())
{
case Field::Types::Int64:
return DecimalField(ToDecimal<Int64, Decimal64>(field.get<Int64>(), scale), scale);
return column_info.getDecimalValue(std::to_string(field.get<Int64>()));
case Field::Types::UInt64:
return DecimalField(ToDecimal<Int64, Decimal64>(field.get<UInt64>(), scale), scale);
return column_info.getDecimalValue(std::to_string(field.get<UInt64>()));
case Field::Types::Float64:
return DecimalField(ToDecimal<Float64, Decimal64>(field.get<Float64>(), scale), scale);
return column_info.getDecimalValue(std::to_string(field.get<Float64>()));
case Field::Types::Decimal32:
return column_info.getDecimalValue(field.get<Decimal32>().toString(column_info.decimal));
case Field::Types::Decimal64:
return column_info.getDecimalValue(field.get<Decimal64>().toString(column_info.decimal));
case Field::Types::Decimal128:
return column_info.getDecimalValue(field.get<Decimal128>().toString(column_info.decimal));
case Field::Types::Decimal256:
return field;
return column_info.getDecimalValue(field.get<Decimal256>().toString(column_info.decimal));
default:
throw Exception(String("Unable to convert field type ") + field.getTypeName() + " to number", ErrorCodes::LOGICAL_ERROR);
}
Expand Down Expand Up @@ -204,9 +207,22 @@ Field convertField(const ColumnInfo & column_info, const Field & field)
case TiDB::TypeDouble:
return convertNumber<Float64>(field);
case TiDB::TypeDate:
{
auto text = field.get<String>();
ReadBufferFromMemory buf(text.data(), text.size());
DayNum_t date;
readDateText(date, buf);
return static_cast<Int64>(date);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be UInt64 or UInt32

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this function converts an arbitrary Field (comes from AST literal) to a non-flat (aka. Bumpy) representation (as Field too) in TiFlash. This bumpy representation will be immediately flattened using DatumBumpy into the flat representation that suits for the encodingRow.

In bumpy representation, data will be as TiFlash's format (therefore Int64 for Date/DateTime). Meanwhile in flat representation, data will be as TiDB format (therefore UInt64 for Date/DateTime).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in TiFlash's format, Date is represented by UInt32, so I suggest to use UInt32 or UInt64 here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, UInt it is. I was asking @hanfei1991 for several times and he kept giving me the wrong answer.

}
case TiDB::TypeDatetime:
case TiDB::TypeTimestamp:
return DB::parseMyDatetime(field.get<String>());
{
auto text = field.get<String>();
ReadBufferFromMemory buf(text.data(), text.size());
time_t dt;
readDateTimeText(dt, buf);
return static_cast<Int64>(dt);
}
case TiDB::TypeVarchar:
case TiDB::TypeTinyBlob:
case TiDB::TypeMediumBlob:
Expand All @@ -221,7 +237,7 @@ Field convertField(const ColumnInfo & column_info, const Field & field)
return Field();
case TiDB::TypeDecimal:
case TiDB::TypeNewDecimal:
return convertDecimal(column_info.decimal, field);
return convertDecimal(column_info, field);
case TiDB::TypeTime:
throw Exception(String("Unable to convert field type ") + field.getTypeName() + " to Time", ErrorCodes::LOGICAL_ERROR);
case TiDB::TypeYear:
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
type_expr.set_tp(tipb::ExprType::String);
std::stringstream ss;
type_expr.set_val(expected_type->getName());
auto type_field_type = type_expr.field_type();
type_field_type.set_tp(0xfe);
type_field_type.set_flag(1);
auto * type_field_type = type_expr.mutable_field_type();
type_field_type->set_tp(0xfe);
type_field_type->set_flag(1);
getActions(type_expr, actions);

Names cast_argument_names;
Expand Down
22 changes: 21 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/Datum.h>
#include <Storages/Transaction/TiDB.h>

#include <unordered_map>
Expand Down Expand Up @@ -84,6 +85,16 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
else
throw Exception("Not decimal literal" + expr.DebugString(), ErrorCodes::COP_BAD_DAG_REQUEST);
}
case tipb::ExprType::MysqlTime:
{
if (!expr.has_field_type()
|| (expr.field_type().tp() != TiDB::TypeDate && expr.field_type().tp() != TiDB::TypeDatetime
&& expr.field_type().tp() != TiDB::TypeTimestamp))
throw Exception("Invalid MySQL Time literal " + expr.DebugString(), ErrorCodes::COP_BAD_DAG_REQUEST);
auto t = decodeDAGUInt64(expr.val());
// TODO: Use timezone in DAG request.
return std::to_string(TiDB::DatumFlat(t, static_cast<TiDB::TP>(expr.field_type().tp())).field().get<Int64>());
}
case tipb::ExprType::ColumnRef:
column_id = decodeDAGInt64(expr.val());
if (column_id < 0 || column_id >= (ColumnID)input_col.size())
Expand Down Expand Up @@ -222,12 +233,21 @@ Field decodeLiteral(const tipb::Expr & expr)
return decodeDAGBytes(expr.val());
case tipb::ExprType::MysqlDecimal:
return decodeDAGDecimal(expr.val());
case tipb::ExprType::MysqlTime:
{
if (!expr.has_field_type()
|| (expr.field_type().tp() != TiDB::TypeDate && expr.field_type().tp() != TiDB::TypeDatetime
&& expr.field_type().tp() != TiDB::TypeTimestamp))
throw Exception("Invalid MySQL Time literal " + expr.DebugString(), ErrorCodes::COP_BAD_DAG_REQUEST);
auto t = decodeDAGUInt64(expr.val());
// TODO: Use timezone in DAG request.
return TiDB::DatumFlat(t, static_cast<TiDB::TP>(expr.field_type().tp())).field();
}
case tipb::ExprType::MysqlBit:
case tipb::ExprType::MysqlDuration:
case tipb::ExprType::MysqlEnum:
case tipb::ExprType::MysqlHex:
case tipb::ExprType::MysqlSet:
case tipb::ExprType::MysqlTime:
case tipb::ExprType::MysqlJson:
case tipb::ExprType::ValueList:
throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported yet", ErrorCodes::UNSUPPORTED_METHOD);
Expand Down
Loading