From 309ab6bca868bff2b0c5f5dbf87157ea26a8cd04 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 8 Aug 2019 02:24:47 +0800 Subject: [PATCH 01/21] Enhance dbg invoke and add dag as schemaful function --- dbms/src/Debug/DBGInvoker.cpp | 99 +++++++++++++++++---------- dbms/src/Debug/DBGInvoker.h | 11 ++- dbms/src/Debug/dbgFuncCoprocessor.cpp | 39 +++++++++++ dbms/src/Debug/dbgFuncCoprocessor.h | 18 +++++ 4 files changed, 126 insertions(+), 41 deletions(-) create mode 100644 dbms/src/Debug/dbgFuncCoprocessor.cpp create mode 100644 dbms/src/Debug/dbgFuncCoprocessor.h diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index acd6b61ba21..b00310bb464 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -29,40 +30,42 @@ void dbgFuncSleep(Context &, const ASTs & args, DBGInvoker::Printer output) DBGInvoker::DBGInvoker() { - regFunc("echo", dbgFuncEcho); + regSchemalessFunc("echo", dbgFuncEcho); // TODO: remove this, use sleep in bash script - regFunc("sleep", dbgFuncSleep); - - regFunc("mock_tidb_table", MockTiDBTable::dbgFuncMockTiDBTable); - regFunc("mock_tidb_partition", MockTiDBTable::dbgFuncMockTiDBPartition); - regFunc("rename_table_for_partition", MockTiDBTable::dbgFuncRenameTableForPartition); - regFunc("drop_tidb_table", MockTiDBTable::dbgFuncDropTiDBTable); - regFunc("add_column_to_tidb_table", MockTiDBTable::dbgFuncAddColumnToTiDBTable); - regFunc("drop_column_from_tidb_table", MockTiDBTable::dbgFuncDropColumnFromTiDBTable); - regFunc("modify_column_in_tidb_table", MockTiDBTable::dbgFuncModifyColumnInTiDBTable); - regFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable); - regFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable); - - regFunc("set_flush_threshold", dbgFuncSetFlushThreshold); - - regFunc("raft_insert_row", dbgFuncRaftInsertRow); - regFunc("raft_insert_row_full", dbgFuncRaftInsertRowFull); - regFunc("raft_insert_rows", dbgFuncRaftInsertRows); - regFunc("raft_update_rows", dbgFuncRaftUpdateRows); - regFunc("raft_delete_rows", dbgFuncRaftDelRows); - regFunc("raft_delete_row", dbgFuncRaftDeleteRow); - - regFunc("put_region", dbgFuncPutRegion); - regFunc("region_snapshot", dbgFuncRegionSnapshot); - regFunc("region_snapshot_data", dbgFuncRegionSnapshotWithData); - - regFunc("try_flush", dbgFuncTryFlush); - regFunc("try_flush_region", dbgFuncTryFlushRegion); - - regFunc("dump_all_region", dbgFuncDumpAllRegion); - - regFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService); - regFunc("refresh_schemas", dbgFuncRefreshSchemas); + regSchemalessFunc("sleep", dbgFuncSleep); + + regSchemalessFunc("mock_tidb_table", MockTiDBTable::dbgFuncMockTiDBTable); + regSchemalessFunc("mock_tidb_partition", MockTiDBTable::dbgFuncMockTiDBPartition); + regSchemalessFunc("rename_table_for_partition", MockTiDBTable::dbgFuncRenameTableForPartition); + regSchemalessFunc("drop_tidb_table", MockTiDBTable::dbgFuncDropTiDBTable); + regSchemalessFunc("add_column_to_tidb_table", MockTiDBTable::dbgFuncAddColumnToTiDBTable); + regSchemalessFunc("drop_column_from_tidb_table", MockTiDBTable::dbgFuncDropColumnFromTiDBTable); + regSchemalessFunc("modify_column_in_tidb_table", MockTiDBTable::dbgFuncModifyColumnInTiDBTable); + regSchemalessFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable); + regSchemalessFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable); + + regSchemalessFunc("set_flush_threshold", dbgFuncSetFlushThreshold); + + regSchemalessFunc("raft_insert_row", dbgFuncRaftInsertRow); + regSchemalessFunc("raft_insert_row_full", dbgFuncRaftInsertRowFull); + regSchemalessFunc("raft_insert_rows", dbgFuncRaftInsertRows); + regSchemalessFunc("raft_update_rows", dbgFuncRaftUpdateRows); + regSchemalessFunc("raft_delete_rows", dbgFuncRaftDelRows); + regSchemalessFunc("raft_delete_row", dbgFuncRaftDeleteRow); + + regSchemalessFunc("put_region", dbgFuncPutRegion); + regSchemalessFunc("region_snapshot", dbgFuncRegionSnapshot); + regSchemalessFunc("region_snapshot_data", dbgFuncRegionSnapshotWithData); + + regSchemalessFunc("try_flush", dbgFuncTryFlush); + regSchemalessFunc("try_flush_region", dbgFuncTryFlushRegion); + + regSchemalessFunc("dump_all_region", dbgFuncDumpAllRegion); + + regSchemalessFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService); + regSchemalessFunc("refresh_schemas", dbgFuncRefreshSchemas); + + regSchemafulFunc("dag", dbgFuncDAG); } void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement) @@ -94,10 +97,25 @@ BlockInputStreamPtr DBGInvoker::invoke(Context & context, const std::string & or name = ori_name.substr(prefix_not_print_res.size(), ori_name.size() - prefix_not_print_res.size()); } - auto it = funcs.find(name); - if (it == funcs.end()) - throw Exception("DBG function not found", ErrorCodes::BAD_ARGUMENTS); + BlockInputStreamPtr res; + auto it_schemaless = schemaless_funcs.find(name); + if (it_schemaless != schemaless_funcs.end()) + res = invokeSchemaless(context, name, it_schemaless->second, args); + else + { + auto it_schemaful = schemaful_funcs.find(name); + if (it_schemaful != schemaful_funcs.end()) + res = invokeSchemaful(context, name, it_schemaful->second, args); + if (it_schemaful == schemaful_funcs.end()) + throw Exception("DBG function not found", ErrorCodes::BAD_ARGUMENTS); + } + + return print_res ? res : std::shared_ptr(); +} +BlockInputStreamPtr DBGInvoker::invokeSchemaless( + Context & context, const std::string & name, const SchemalessDBGFunc & func, const ASTs & args) +{ std::stringstream col_name; col_name << name << "("; for (size_t i = 0; i < args.size(); ++i) @@ -110,9 +128,14 @@ BlockInputStreamPtr DBGInvoker::invoke(Context & context, const std::string & or std::shared_ptr res = std::make_shared(col_name.str()); Printer printer = [&](const std::string & s) { res->append(s); }; - (it->second)(context, args, printer); + func(context, args, printer); - return print_res ? res : std::shared_ptr(); + return res; +} + +BlockInputStreamPtr DBGInvoker::invokeSchemaful(Context & context, const std::string &, const SchemafulDBGFunc & func, const ASTs & args) +{ + return func(context, args); } } // namespace DB diff --git a/dbms/src/Debug/DBGInvoker.h b/dbms/src/Debug/DBGInvoker.h index 71e8487f1fa..95b2449fd9b 100644 --- a/dbms/src/Debug/DBGInvoker.h +++ b/dbms/src/Debug/DBGInvoker.h @@ -25,16 +25,21 @@ class DBGInvoker { public: using Printer = std::function; - using DBGFunc = std::function; + using SchemalessDBGFunc = std::function; + using SchemafulDBGFunc = std::function; DBGInvoker(); - void regFunc(const std::string & name, DBGFunc func) { funcs[name] = func; } + void regSchemalessFunc(const std::string & name, SchemalessDBGFunc func) { schemaless_funcs[name] = func; } + void regSchemafulFunc(const std::string & name, SchemafulDBGFunc func) { schemaful_funcs[name] = func; } BlockInputStreamPtr invoke(Context & context, const std::string & ori_name, const ASTs & args); + BlockInputStreamPtr invokeSchemaless(Context & context, const std::string & name, const SchemalessDBGFunc & func, const ASTs & args); + BlockInputStreamPtr invokeSchemaful(Context & context, const std::string & name, const SchemafulDBGFunc & func, const ASTs & args); private: - std::unordered_map funcs; + std::unordered_map schemaless_funcs; + std::unordered_map schemaful_funcs; }; } // namespace DB diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp new file mode 100644 index 00000000000..ef26f353c65 --- /dev/null +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int BAD_ARGUMENTS; +} // namespace ErrorCodes + +tipb::DAGRequest compileDAG(Context & context, const String & query); +tipb::SelectResponse executeDAG(Context & context, const tipb::DAGRequest & dag_request); + +BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args) +{ + if (args.size() < 2 || args.size() > 3) + throw Exception("Args not matched, should be: query, region-id[, start-ts]", ErrorCodes::BAD_ARGUMENTS); + + String query = safeGet(typeid_cast(*args[0]).value); + // RegionID region_id = safeGet(typeid_cast(*args[1]).value); + // Timestamp start_ts = DEFAULT_MAX_READ_TSO; + // if (args.size() == 3) + // start_ts = safeGet(typeid_cast(*args[2]).value); + // if (start_ts == 0) + // start_ts = context.getTMTContext().getPDClient()->getTS(); + // + // tipb::DAGRequest dag_request = compileDAG(context, query); + // tipb::SelectResponse dag_response = executeDAG(context, dag_request); + + return executeQuery(query, context, true).in; +} + +} // namespace DB diff --git a/dbms/src/Debug/dbgFuncCoprocessor.h b/dbms/src/Debug/dbgFuncCoprocessor.h new file mode 100644 index 00000000000..ec18ea7428d --- /dev/null +++ b/dbms/src/Debug/dbgFuncCoprocessor.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class Context; + +// Coprocessor debug tools + +// Run a DAG request using given query (will be compiled to DAG executors), region ID and start ts. +// Usage: +// ./storages-client.sh "DBGInvoke dag(query, region_id, start_ts)" +BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args); + +} // namespace DB From 31d83c47dc344bf0b4f2623d38844c5df9107def Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 8 Aug 2019 16:35:08 +0800 Subject: [PATCH 02/21] Add basic sql parse to dag --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 148 ++++++++++++++++++++--- dbms/src/Flash/Coprocessor/DAGDriver.cpp | 9 +- dbms/src/Flash/Coprocessor/DAGDriver.h | 4 +- dbms/src/Interpreters/executeQuery.cpp | 4 +- dbms/src/Interpreters/executeQuery.h | 2 +- 5 files changed, 145 insertions(+), 22 deletions(-) diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index ef26f353c65..502b5884817 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -1,9 +1,17 @@ #include +#include #include #include -#include +#include +#include #include +#include +#include +#include +#include +#include #include +#include #include namespace DB @@ -14,8 +22,12 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } // namespace ErrorCodes -tipb::DAGRequest compileDAG(Context & context, const String & query); -tipb::SelectResponse executeDAG(Context & context, const tipb::DAGRequest & dag_request); +using DAGField = std::pair; +using DAGSchema = std::vector; +std::tuple compileQuery(Context & context, const String & query, Timestamp start_ts); +tipb::SelectResponse executeDAGRequest( + Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version); +BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse dag_response); BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args) { @@ -23,17 +35,125 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args) throw Exception("Args not matched, should be: query, region-id[, start-ts]", ErrorCodes::BAD_ARGUMENTS); String query = safeGet(typeid_cast(*args[0]).value); - // RegionID region_id = safeGet(typeid_cast(*args[1]).value); - // Timestamp start_ts = DEFAULT_MAX_READ_TSO; - // if (args.size() == 3) - // start_ts = safeGet(typeid_cast(*args[2]).value); - // if (start_ts == 0) - // start_ts = context.getTMTContext().getPDClient()->getTS(); - // - // tipb::DAGRequest dag_request = compileDAG(context, query); - // tipb::SelectResponse dag_response = executeDAG(context, dag_request); - - return executeQuery(query, context, true).in; + RegionID region_id = safeGet(typeid_cast(*args[1]).value); + Timestamp start_ts = DEFAULT_MAX_READ_TSO; + if (args.size() == 3) + start_ts = safeGet(typeid_cast(*args[2]).value); + if (start_ts == 0) + start_ts = context.getTMTContext().getPDClient()->getTS(); + + RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id); + auto [schema, dag_request] = compileQuery(context, query, start_ts); + tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer()); + + return outputDAGResponse(context, schema, dag_response); +} + +std::tuple compileQuery(Context & context, const String & query, Timestamp start_ts) +{ + DAGSchema schema; + tipb::DAGRequest dag_request; + + dag_request.set_start_ts(start_ts); + + ParserSelectQuery parser; + ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "from DAG compiler", 0); + ASTSelectQuery & ast_query = typeid_cast(*ast); + + String database_name, table_name; + auto query_database = ast_query.database(); + auto query_table = ast_query.table(); + if (query_database) + database_name = typeid_cast(*query_database).name; + if (query_table) + table_name = typeid_cast(*query_table).name; + if (!query_table) + { + database_name = "system"; + table_name = "one"; + } + else if (!query_database) + { + database_name = context.getCurrentDatabase(); + } + const auto & table_info = MockTiDB::instance().getTableByName(database_name, table_name)->table_info; + + tipb::Executor * executor = dag_request.add_executors(); + executor->set_tp(tipb::ExecType::TypeTableScan); + tipb::TableScan * ts = executor->mutable_tbl_scan(); + ts->set_table_id(table_info.id); + size_t i = 0; + for (const auto & column_info : table_info.columns) + { + tipb::ColumnInfo * ci = ts->add_columns(); + ci->set_column_id(column_info.id); + ci->set_tp(column_info.tp); + ci->set_flag(column_info.flag); + + tipb::FieldType field_type; + field_type.set_tp(column_info.tp); + field_type.set_flag(column_info.flag); + field_type.set_flen(column_info.flen); + field_type.set_decimal(column_info.decimal); + schema.emplace_back(std::make_pair(column_info.name, std::move(field_type))); + + dag_request.add_output_offsets(i); + + i++; + } + + return std::make_pair(std::move(schema), std::move(dag_request)); +} + +tipb::SelectResponse executeDAGRequest( + Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version) +{ + tipb::SelectResponse dag_response; + DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, dag_response); + driver.execute(); + return dag_response; +} + +BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse dag_response) +{ + BlocksList blocks; + for (const auto & chunk : dag_response.chunks()) + { + std::vector> rows; + std::vector curr_row; + const std::string & data = chunk.rows_data(); + size_t cursor = 0; + while (cursor < data.size()) + { + curr_row.push_back(DB::DecodeDatum(cursor, data)); + if (curr_row.size() == schema.size()) + { + rows.emplace_back(std::move(curr_row)); + curr_row.clear(); + } + } + + ColumnsWithTypeAndName columns; + for (auto & field : schema) + { + const auto & name = field.first; + auto data_type = getDataTypeByFieldType(field.second); + ColumnWithTypeAndName col(data_type, name); + col.column->assumeMutable()->reserve(rows.size()); + columns.emplace_back(std::move(col)); + } + for (const auto & row : rows) + { + for (size_t i = 0; i < row.size(); i++) + { + columns[i].column->assumeMutable()->insert(row[i]); + } + } + + blocks.emplace_back(Block(columns)); + } + + return std::make_shared(std::move(blocks)); } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index 2eb69d5c452..c2ef3890365 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -17,13 +17,14 @@ extern const int LOGICAL_ERROR; } DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_, - UInt64 region_conf_version_, tipb::SelectResponse & dag_response_) + UInt64 region_conf_version_, tipb::SelectResponse & dag_response_, bool internal_) : context(context_), dag_request(dag_request_), region_id(region_id_), region_version(region_version_), region_conf_version(region_conf_version_), - dag_response(dag_response_) + dag_response(dag_response_), + internal(internal_) {} void DAGDriver::execute() @@ -39,11 +40,11 @@ void DAGDriver::execute() DAGStringConverter converter(context, dag_request); String query = converter.buildSqlString(); if (!query.empty()) - streams = executeQuery(query, context, false, QueryProcessingStage::Complete); + streams = executeQuery(query, context, internal, QueryProcessingStage::Complete); } else if (planner == "optree") { - streams = executeQuery(dag, context, QueryProcessingStage::Complete); + streams = executeQuery(dag, context, internal, QueryProcessingStage::Complete); } else { diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.h b/dbms/src/Flash/Coprocessor/DAGDriver.h index b0143591bd5..a9eda48b025 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.h +++ b/dbms/src/Flash/Coprocessor/DAGDriver.h @@ -15,7 +15,7 @@ class DAGDriver { public: DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_, - UInt64 region_conf_version_, tipb::SelectResponse & dag_response_); + UInt64 region_conf_version_, tipb::SelectResponse & dag_response_, bool internal_ = false); void execute(); @@ -29,5 +29,7 @@ class DAGDriver UInt64 region_conf_version; tipb::SelectResponse & dag_response; + + bool internal; }; } // namespace DB diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 698da73c9b6..6ebe2f72b50 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -392,10 +392,10 @@ BlockIO executeQuery( } -BlockIO executeQuery(DAGQuerySource & dag, Context & context, QueryProcessingStage::Enum stage) +BlockIO executeQuery(DAGQuerySource & dag, Context & context, bool internal, QueryProcessingStage::Enum stage) { BlockIO streams; - std::tie(std::ignore, streams) = executeQueryImpl(dag, context, false, stage); + std::tie(std::ignore, streams) = executeQueryImpl(dag, context, internal, stage); return streams; } diff --git a/dbms/src/Interpreters/executeQuery.h b/dbms/src/Interpreters/executeQuery.h index 55b9ea7306a..ba784247f0b 100644 --- a/dbms/src/Interpreters/executeQuery.h +++ b/dbms/src/Interpreters/executeQuery.h @@ -41,6 +41,6 @@ BlockIO executeQuery( ); -BlockIO executeQuery(DAGQuerySource & dag, Context & context, QueryProcessingStage::Enum stage); +BlockIO executeQuery(DAGQuerySource & dag, Context & context, bool internal, QueryProcessingStage::Enum stage); } From 63a5800bba9500277adf61d0b067e53fa0c564b0 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 8 Aug 2019 20:38:17 +0800 Subject: [PATCH 03/21] Column id starts from 1 --- dbms/src/Debug/MockTiDB.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 70c25cec551..e32fead22d2 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -174,7 +174,7 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na table_info.id = table_id_allocator++; table_info.name = table_name; - int i = 0; + int i = 1; for (auto & column : columns.getAllPhysical()) { table_info.columns.emplace_back(getColumnInfoFromColumn(column, i++)); From 232e7def44846682a965e0057812eacb4291aa68 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 8 Aug 2019 20:38:59 +0800 Subject: [PATCH 04/21] Fix value to ref --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 502b5884817..6a5cd7b6087 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -27,7 +27,7 @@ using DAGSchema = std::vector; std::tuple compileQuery(Context & context, const String & query, Timestamp start_ts); tipb::SelectResponse executeDAGRequest( Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version); -BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse dag_response); +BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response); BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args) { @@ -109,12 +109,12 @@ tipb::SelectResponse executeDAGRequest( Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version) { tipb::SelectResponse dag_response; - DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, dag_response); + DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, dag_response, true); driver.execute(); return dag_response; } -BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse dag_response) +BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response) { BlocksList blocks; for (const auto & chunk : dag_response.chunks()) From 1b14a12a9a3fd4db82e4ed34cb905c62f9dc4770 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 8 Aug 2019 20:39:34 +0800 Subject: [PATCH 05/21] Add basic dag test --- tests/mutable-test/txn_dag/table_scan.test | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 tests/mutable-test/txn_dag/table_scan.test diff --git a/tests/mutable-test/txn_dag/table_scan.test b/tests/mutable-test/txn_dag/table_scan.test new file mode 100644 index 00000000000..bffd702eae1 --- /dev/null +++ b/tests/mutable-test/txn_dag/table_scan.test @@ -0,0 +1,20 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1') + +# DAG read +=> DBGInvoke dag('select * from default.test', 4) " --dag_planner="optree + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test From 25eb831ac219255388e40efa9978698020b9aa71 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 9 Aug 2019 03:31:59 +0800 Subject: [PATCH 06/21] Fix dag bugs and pass 1st mock test --- .../Coprocessor/DAGExpressionAnalyzer.cpp | 6 ++--- dbms/src/Flash/Coprocessor/InterpreterDAG.cpp | 26 +++++++++++++------ tests/mutable-test/txn_dag/table_scan.test | 3 +++ 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index 0e3779363a0..49ae38fb716 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -85,12 +85,10 @@ void DAGExpressionAnalyzer::appendAggregation( DataTypePtr result_type = aggregate.function->getReturnType(); // this is a temp result since implicit cast maybe added on these aggregated_columns aggregated_columns.emplace_back(func_string, result_type); + // TODO: No clear agg_argument_names??? } - for (auto name : agg_argument_names) - { - step.required_output.push_back(std::move(name)); - } + std::move(step.required_output.begin(), step.required_output.end(), std::back_inserter(agg_argument_names)); for (const tipb::Expr & expr : agg.group_by()) { diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index 715d829a926..634b20748f7 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -176,19 +176,19 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() { AnalysisResult res; ExpressionActionsChain chain; - res.need_aggregate = dag.hasAggregation(); DAGExpressionAnalyzer analyzer(source_columns, context); if (dag.hasSelection()) { analyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name); res.has_where = true; res.before_where = chain.getLastActions(); - res.filter_column_name = chain.steps.back().required_output[0]; chain.addStep(); } - if (res.need_aggregate) + // There will be either Agg... + if (dag.hasAggregation()) { analyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions); + res.need_aggregate = true; res.before_aggregation = chain.getLastActions(); chain.finalize(); @@ -202,17 +202,22 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() final_project.emplace_back(element.name, ""); } } + // Or TopN, not both. if (dag.hasTopN()) { res.has_order_by = true; analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names); } - // append final project results - for (auto & name : final_project) + // Append final project results if needed. + if (dag.hasSelection() || dag.hasAggregation() || dag.hasTopN()) { - chain.steps.back().required_output.push_back(name.first); + // TODO: No new action added, file_project will be added last filter/agg/topN. OK??? + for (auto & name : final_project) + { + chain.steps.back().required_output.push_back(name.first); + } + res.before_order_and_select = chain.getLastActions(); } - res.before_order_and_select = chain.getLastActions(); chain.finalize(); chain.clear(); //todo need call prependProjectInput?? @@ -453,7 +458,11 @@ void InterpreterDAG::executeImpl(Pipeline & pipeline) executeAggregation(pipeline, res.before_aggregation, res.aggregation_keys, res.aggregate_descriptions); recordProfileStreams(pipeline, dag.getAggregationIndex()); } - executeExpression(pipeline, res.before_order_and_select); + if (res.before_order_and_select) + { + executeExpression(pipeline, res.before_order_and_select); + // TODO: No record profile stream??? + } if (res.has_order_by) { @@ -464,6 +473,7 @@ void InterpreterDAG::executeImpl(Pipeline & pipeline) // execute projection executeFinalProject(pipeline); + // TODO: No record profile stream??? // execute limit if (dag.hasLimit() && !dag.hasTopN()) diff --git a/tests/mutable-test/txn_dag/table_scan.test b/tests/mutable-test/txn_dag/table_scan.test index bffd702eae1..36667375024 100644 --- a/tests/mutable-test/txn_dag/table_scan.test +++ b/tests/mutable-test/txn_dag/table_scan.test @@ -14,6 +14,9 @@ # DAG read => DBGInvoke dag('select * from default.test', 4) " --dag_planner="optree +┌─col_1─┐ +│ test1 │ +└───────┘ # Clean up. => DBGInvoke __drop_tidb_table(default, test) From 80f9fc6f1bb178570e3cd1e754fbd865a7b8c23b Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 9 Aug 2019 04:20:03 +0800 Subject: [PATCH 07/21] Make dag go normal routine and add mock dag --- dbms/src/Debug/DBGInvoker.cpp | 1 + dbms/src/Debug/dbgFuncCoprocessor.cpp | 59 ++++++++++++++++++++-- dbms/src/Debug/dbgFuncCoprocessor.h | 9 +++- tests/mutable-test/txn_dag/table_scan.test | 6 +++ 4 files changed, 68 insertions(+), 7 deletions(-) diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index 565efed2269..dadb7285915 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -69,6 +69,7 @@ DBGInvoker::DBGInvoker() regSchemalessFunc("reset_schemas", dbgFuncResetSchemas); regSchemafulFunc("dag", dbgFuncDAG); + regSchemafulFunc("mock_dag", dbgFuncMockDAG); } void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement) diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 6a5cd7b6087..bb1cd84cf73 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -24,12 +25,54 @@ extern const int BAD_ARGUMENTS; using DAGField = std::pair; using DAGSchema = std::vector; -std::tuple compileQuery(Context & context, const String & query, Timestamp start_ts); +using SchemaFetcher = std::function; +std::tuple compileQuery( + Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts); tipb::SelectResponse executeDAGRequest( Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version); BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response); BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args) +{ + if (args.size() < 1 || args.size() > 2) + throw Exception("Args not matched, should be: query[, region-id]", ErrorCodes::BAD_ARGUMENTS); + + String query = safeGet(typeid_cast(*args[0]).value); + RegionID region_id = InvalidRegionID; + if (args.size() == 2) + region_id = safeGet(typeid_cast(*args[1]).value); + Timestamp start_ts = context.getTMTContext().getPDClient()->getTS(); + + auto [table_id, schema, dag_request] = compileQuery(context, query, + [&](const String & database_name, const String & table_name) { + auto storage = context.getTable(database_name, table_name); + auto mmt = std::dynamic_pointer_cast(storage); + if (!mmt || mmt->getData().merging_params.mode != MergeTreeData::MergingParams::Txn) + throw Exception("Not TMT", ErrorCodes::BAD_ARGUMENTS); + return mmt->getTableInfo(); + }, + start_ts); + + RegionPtr region; + if (region_id == InvalidRegionID) + { + auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id); + if (regions.empty()) + throw Exception("No region for table", ErrorCodes::BAD_ARGUMENTS); + region = context.getTMTContext().getRegionTable().getRegionsByTable(table_id).front().second; + } + else + { + region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, region_id); + if (!region) + throw Exception("No such region", ErrorCodes::BAD_ARGUMENTS); + } + tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer()); + + return outputDAGResponse(context, schema, dag_response); +} + +BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args) { if (args.size() < 2 || args.size() > 3) throw Exception("Args not matched, should be: query, region-id[, start-ts]", ErrorCodes::BAD_ARGUMENTS); @@ -42,14 +85,20 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args) if (start_ts == 0) start_ts = context.getTMTContext().getPDClient()->getTS(); + auto [table_id, schema, dag_request] = compileQuery(context, query, + [&](const String & database_name, const String & table_name) { + return MockTiDB::instance().getTableByName(database_name, table_name)->table_info; + }, + start_ts); + RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id); - auto [schema, dag_request] = compileQuery(context, query, start_ts); tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer()); return outputDAGResponse(context, schema, dag_response); } -std::tuple compileQuery(Context & context, const String & query, Timestamp start_ts) +std::tuple compileQuery( + Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts) { DAGSchema schema; tipb::DAGRequest dag_request; @@ -76,7 +125,7 @@ std::tuple compileQuery(Context & context, const St { database_name = context.getCurrentDatabase(); } - const auto & table_info = MockTiDB::instance().getTableByName(database_name, table_name)->table_info; + auto table_info = schema_fetcher(database_name, table_name); tipb::Executor * executor = dag_request.add_executors(); executor->set_tp(tipb::ExecType::TypeTableScan); @@ -102,7 +151,7 @@ std::tuple compileQuery(Context & context, const St i++; } - return std::make_pair(std::move(schema), std::move(dag_request)); + return std::make_tuple(table_info.id, std::move(schema), std::move(dag_request)); } tipb::SelectResponse executeDAGRequest( diff --git a/dbms/src/Debug/dbgFuncCoprocessor.h b/dbms/src/Debug/dbgFuncCoprocessor.h index ec18ea7428d..eb8cc989fd5 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.h +++ b/dbms/src/Debug/dbgFuncCoprocessor.h @@ -10,9 +10,14 @@ class Context; // Coprocessor debug tools -// Run a DAG request using given query (will be compiled to DAG executors), region ID and start ts. +// Run a DAG request using given query that will be compiled to DAG request, with the given (optional) region ID. // Usage: -// ./storages-client.sh "DBGInvoke dag(query, region_id, start_ts)" +// ./storages-client.sh "DBGInvoke dag(query[, region_id])" BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args); +// Mock a DAG request using given query that will be compiled (with the metadata from MockTiDB) to DAG request, with the given region ID and (optional) start ts. +// Usage: +// ./storages-client.sh "DBGInvoke mock_dag(query, region_id[, start_ts])" +BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args); + } // namespace DB diff --git a/tests/mutable-test/txn_dag/table_scan.test b/tests/mutable-test/txn_dag/table_scan.test index 36667375024..f18b2f34498 100644 --- a/tests/mutable-test/txn_dag/table_scan.test +++ b/tests/mutable-test/txn_dag/table_scan.test @@ -18,6 +18,12 @@ │ test1 │ └───────┘ +# Mock DAG read +=> DBGInvoke mock_dag('select * from default.test', 4) " --dag_planner="optree +┌─col_1─┐ +│ test1 │ +└───────┘ + # Clean up. => DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test From a1173e1ae659d94aba085f7243b9f719062c4f3e Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 9 Aug 2019 04:22:53 +0800 Subject: [PATCH 08/21] Add todo --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index bb1cd84cf73..62b5c799493 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -151,6 +151,8 @@ std::tuple compileQuery( i++; } + // TODO: Other operator compile. + return std::make_tuple(table_info.id, std::move(schema), std::move(dag_request)); } From c8109f63670e48eb543728d0e4f595bf70ca98f6 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 9 Aug 2019 04:35:33 +0800 Subject: [PATCH 09/21] Add comment --- dbms/src/Flash/Coprocessor/DAGContext.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 9221dc38bef..30e492f360f 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -8,10 +8,12 @@ namespace DB class Context; +/// A context used to track the information that needs to be passed around during DAG planning. class DAGContext { public: DAGContext(size_t profile_list_size) { profile_streams_list.resize(profile_list_size); }; std::vector profile_streams_list; }; + } // namespace DB From 7dc039750c39cf1de53f4d1592c45c6a132db701 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 9 Aug 2019 07:46:40 +0800 Subject: [PATCH 10/21] Fix gcc compile error --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 62b5c799493..1277b3fe5e4 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -90,6 +90,7 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args) return MockTiDB::instance().getTableByName(database_name, table_name)->table_info; }, start_ts); + std::ignore = table_id; RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id); tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer()); From 66d9e8abdccc581ba1948e7849f586e15b970a0e Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 9 Aug 2019 07:46:50 +0800 Subject: [PATCH 11/21] Enhance dag test --- tests/mutable-test/txn_dag/table_scan.test | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/mutable-test/txn_dag/table_scan.test b/tests/mutable-test/txn_dag/table_scan.test index f18b2f34498..28d6599f6de 100644 --- a/tests/mutable-test/txn_dag/table_scan.test +++ b/tests/mutable-test/txn_dag/table_scan.test @@ -12,13 +12,19 @@ => DBGInvoke __put_region(4, 0, 100, default, test) => DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1') -# DAG read +# DAG read by not specifying region id. +=> DBGInvoke dag('select * from default.test') " --dag_planner="optree +┌─col_1─┐ +│ test1 │ +└───────┘ + +# DAG read by explicitly specifying region id. => DBGInvoke dag('select * from default.test', 4) " --dag_planner="optree ┌─col_1─┐ │ test1 │ └───────┘ -# Mock DAG read +# Mock DAG read. => DBGInvoke mock_dag('select * from default.test', 4) " --dag_planner="optree ┌─col_1─┐ │ test1 │ From 36d11179ce3c9aff34cfab30e7fe51f3e2b2e57c Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 9 Aug 2019 15:23:36 +0800 Subject: [PATCH 12/21] Address comments --- dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp | 3 +-- dbms/src/Flash/Coprocessor/InterpreterDAG.cpp | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index 49ae38fb716..e407d5c0d6b 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -85,10 +85,9 @@ void DAGExpressionAnalyzer::appendAggregation( DataTypePtr result_type = aggregate.function->getReturnType(); // this is a temp result since implicit cast maybe added on these aggregated_columns aggregated_columns.emplace_back(func_string, result_type); - // TODO: No clear agg_argument_names??? } - std::move(step.required_output.begin(), step.required_output.end(), std::back_inserter(agg_argument_names)); + std::move(agg_argument_names.begin(), agg_argument_names.end(), std::back_inserter(step.required_output)); for (const tipb::Expr & expr : agg.group_by()) { diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index 634b20748f7..e8eea5a1491 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -209,9 +209,9 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names); } // Append final project results if needed. + // TODO: Refine this logic by an `analyzer.appendFinalProject()`-like call. if (dag.hasSelection() || dag.hasAggregation() || dag.hasTopN()) { - // TODO: No new action added, file_project will be added last filter/agg/topN. OK??? for (auto & name : final_project) { chain.steps.back().required_output.push_back(name.first); @@ -461,7 +461,6 @@ void InterpreterDAG::executeImpl(Pipeline & pipeline) if (res.before_order_and_select) { executeExpression(pipeline, res.before_order_and_select); - // TODO: No record profile stream??? } if (res.has_order_by) @@ -473,7 +472,6 @@ void InterpreterDAG::executeImpl(Pipeline & pipeline) // execute projection executeFinalProject(pipeline); - // TODO: No record profile stream??? // execute limit if (dag.hasLimit() && !dag.hasTopN()) From a9fe9f923da32450fe038d42370345b675d504ca Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Tue, 13 Aug 2019 01:42:05 +0800 Subject: [PATCH 13/21] Enhance mock sql -> dag compiler and add project test --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 223 +++++++++++++++++---- tests/mutable-test/txn_dag/project.test | 41 ++++ tests/mutable-test/txn_dag/table_scan.test | 2 +- 3 files changed, 231 insertions(+), 35 deletions(-) create mode 100644 tests/mutable-test/txn_dag/project.test diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 84882c4597d..e0fba0ec68c 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include #include @@ -21,6 +23,7 @@ namespace DB namespace ErrorCodes { extern const int BAD_ARGUMENTS; +extern const int LOGICA_ERROR; } // namespace ErrorCodes using DAGField = std::pair; @@ -98,6 +101,42 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args) return outputDAGResponse(context, schema, dag_response); } +struct ExecutorCtx +{ + tipb::Executor * input; + DAGSchema output; + std::unordered_map col_ref_map; +}; + +void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::unordered_set & referred_columns, + std::unordered_map col_ref_map) +{ + if (ASTIdentifier * id = typeid_cast(ast.get())) + { + auto ft = std::find_if(input.begin(), input.end(), [&](const auto & field) { return field.first == id->getColumnName(); }); + if (ft == input.end()) + throw DB::Exception("No such column " + id->getColumnName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + expr->set_tp(tipb::ColumnRef); + *(expr->mutable_field_type()) = (*ft).second; + + referred_columns.emplace((*ft).first); + col_ref_map.emplace((*ft).first, expr); + } + // else if (ASTFunction * func = typeid_cast(ast.get())) + // { + // } + else + { + throw DB::Exception("Unsupported expression " + ast->getColumnName(), ErrorCodes::LOGICAL_ERROR); + } + + for (const auto & child_ast : ast->children) + { + tipb::Expr * child = expr->add_children(); + compileExpr(input, child_ast, child, referred_columns, col_ref_map); + } +}; + std::tuple compileQuery( Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts) { @@ -110,49 +149,162 @@ std::tuple compileQuery( ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "from DAG compiler", 0); ASTSelectQuery & ast_query = typeid_cast(*ast); - String database_name, table_name; - auto query_database = ast_query.database(); - auto query_table = ast_query.table(); - if (query_database) - database_name = typeid_cast(*query_database).name; - if (query_table) - table_name = typeid_cast(*query_table).name; - if (!query_table) + /// Get table metadata. + TiDB::TableInfo table_info; + { + String database_name, table_name; + auto query_database = ast_query.database(); + auto query_table = ast_query.table(); + if (query_database) + database_name = typeid_cast(*query_database).name; + if (query_table) + table_name = typeid_cast(*query_table).name; + if (!query_table) + { + database_name = "system"; + table_name = "one"; + } + else if (!query_database) + { + database_name = context.getCurrentDatabase(); + } + + table_info = schema_fetcher(database_name, table_name); + } + + std::map executor_ctx_map; + std::unordered_set referred_columns; + tipb::TableScan * ts = nullptr; + tipb::Executor * last_executor = nullptr; + + /// Table scan. + { + tipb::Executor * ts_exec = dag_request.add_executors(); + ts_exec->set_tp(tipb::ExecType::TypeTableScan); + ts = ts_exec->mutable_tbl_scan(); + ts->set_table_id(table_info.id); + DAGSchema ts_output; + for (const auto & column_info : table_info.columns) + { + tipb::FieldType field_type; + field_type.set_tp(column_info.tp); + field_type.set_flag(column_info.flag); + field_type.set_flen(column_info.flen); + field_type.set_decimal(column_info.decimal); + ts_output.emplace_back(std::make_pair(column_info.name, std::move(field_type))); + } + executor_ctx_map.emplace(ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map{}}); + last_executor = ts_exec; + } + + /// Filter. + if (ast_query.where_expression) { - database_name = "system"; - table_name = "one"; + tipb::Executor * filter_exec = dag_request.add_executors(); + filter_exec->set_tp(tipb::ExecType::TypeSelection); + tipb::Selection * filter = filter_exec->mutable_selection(); + tipb::Expr * cond = filter->add_conditions(); + std::unordered_map col_ref_map; + compileExpr(executor_ctx_map[last_executor].output, ast_query.where_expression, cond, referred_columns, col_ref_map); + executor_ctx_map.emplace(filter_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::move(col_ref_map)}); + last_executor = filter_exec; } - else if (!query_database) + + /// TopN. + if (ast_query.order_expression_list && ast_query.limit_length) { - database_name = context.getCurrentDatabase(); + tipb::Executor * topn_exec = dag_request.add_executors(); + topn_exec->set_tp(tipb::ExecType::TypeTopN); + tipb::TopN * topN = topn_exec->mutable_topn(); + std::unordered_map col_ref_map; + for (const auto & child : ast_query.order_expression_list->children) + { + tipb::ByItem * by = topN->add_order_by(); + tipb::Expr * expr = by->mutable_expr(); + compileExpr(executor_ctx_map[last_executor].output, child, expr, referred_columns, col_ref_map); + } + auto limit = safeGet(typeid_cast(*ast_query.limit_length).value); + topN->set_limit(limit); + executor_ctx_map.emplace(topn_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::move(col_ref_map)}); + last_executor = topn_exec; } - auto table_info = schema_fetcher(database_name, table_name); - - tipb::Executor * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeTableScan); - tipb::TableScan * ts = executor->mutable_tbl_scan(); - ts->set_table_id(table_info.id); - size_t i = 0; - for (const auto & column_info : table_info.columns) + + /// Aggregation. + if (ast_query.group_expression_list) {} + if (ast_query.select_expression_list /* select_expression_list has agg*/) {} + + /// Finalize. + if (!last_executor->has_aggregation()) { - tipb::ColumnInfo * ci = ts->add_columns(); - ci->set_column_id(column_info.id); - ci->set_tp(column_info.tp); - ci->set_flag(column_info.flag); + std::vector final_output; + for (const auto & expr : ast_query.select_expression_list->children) + { + if (ASTIdentifier * id = typeid_cast(expr.get())) + { + referred_columns.emplace(id->getColumnName()); + final_output.emplace_back(id->getColumnName()); + } + else if (typeid_cast(expr.get())) + { + const auto & last_output = executor_ctx_map[last_executor].output; + for (const auto & field : last_output) + { + referred_columns.emplace(field.first); + final_output.push_back(field.first); + } + } + else + { + throw DB::Exception("Unsupported expression type in select", ErrorCodes::LOGICAL_ERROR); + } + } - tipb::FieldType field_type; - field_type.set_tp(column_info.tp); - field_type.set_flag(column_info.flag); - field_type.set_flen(column_info.flen); - field_type.set_decimal(column_info.decimal); - schema.emplace_back(std::make_pair(column_info.name, std::move(field_type))); + std::function column_pruner = [&](ExecutorCtx & executor_ctx) { + if (!executor_ctx.input) + { + executor_ctx.output.erase(std::remove_if(executor_ctx.output.begin(), executor_ctx.output.end(), + [&](const auto & field) { return referred_columns.count(field.first) == 0; }), + executor_ctx.output.end()); - dag_request.add_output_offsets(i); + for (const auto & field : executor_ctx.output) + { + tipb::ColumnInfo * ci = ts->add_columns(); + ci->set_column_id(table_info.getColumnID(field.first)); + ci->set_tp(field.second.tp()); + ci->set_flag(field.second.flag()); + ci->set_columnlen(field.second.flen()); + ci->set_decimal(field.second.decimal()); + } - i++; - } + return; + } + column_pruner(executor_ctx_map[executor_ctx.input]); + const auto & last_output = executor_ctx_map[executor_ctx.input].output; + for (const auto & pair : executor_ctx.col_ref_map) + { + auto iter + = std::find_if(last_output.begin(), last_output.end(), [&](const auto & field) { return field.first == pair.first; }); + if (iter == last_output.end()) + throw DB::Exception("Column not found when pruning: " + pair.first, ErrorCodes::LOGICAL_ERROR); + std::stringstream ss; + DB::EncodeNumber(iter - last_output.begin(), ss); + pair.second->set_val(ss.str()); + } + executor_ctx.output = last_output; + }; + column_pruner(executor_ctx_map[last_executor]); - // TODO: Other operator compile. + const auto & last_output = executor_ctx_map[last_executor].output; + for (const auto & field : final_output) + { + auto iter + = std::find_if(last_output.begin(), last_output.end(), [&](const auto & last_field) { return last_field.first == field; }); + if (iter == last_output.end()) + throw DB::Exception("Column not found after pruning: " + field, ErrorCodes::LOGICAL_ERROR); + dag_request.add_output_offsets(iter - last_output.begin()); + schema.push_back(*iter); + } + } return std::make_tuple(table_info.id, std::move(schema), std::move(dag_request)); } @@ -160,9 +312,12 @@ std::tuple compileQuery( tipb::SelectResponse executeDAGRequest( Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version) { + Logger * log = &Logger::get("MockDAG"); + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString()); tipb::SelectResponse dag_response; DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, dag_response, true); driver.execute(); + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done"); return dag_response; } diff --git a/tests/mutable-test/txn_dag/project.test b/tests/mutable-test/txn_dag/project.test new file mode 100644 index 00000000000..8b29b4a7a08 --- /dev/null +++ b/tests/mutable-test/txn_dag/project.test @@ -0,0 +1,41 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) + +# DAG read by not specifying region id, select *. +=> DBGInvoke dag('select * from default.test') " --dag_planner="optree +┌─col_1─┬─col_2─┐ +│ test1 │ 666 │ +└───────┴───────┘ + +# DAG read by not specifying region id, select col_1. +=> DBGInvoke dag('select col_1 from default.test') " --dag_planner="optree +┌─col_1─┐ +│ test1 │ +└───────┘ + +# DAG read by explicitly specifying region id, select col_2. +=> DBGInvoke dag('select col_2 from default.test', 4) " --dag_planner="optree +┌─col_2─┐ +│ 666 │ +└───────┘ + +# Mock DAG read, select col_2, col_1, col_2. +=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test', 4) " --dag_planner="optree +┌─col_2─┬─col_1─┬─col_2─┐ +│ 666 │ test1 │ 666 │ +└───────┴───────┴───────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_dag/table_scan.test b/tests/mutable-test/txn_dag/table_scan.test index 28d6599f6de..953af0cef9d 100644 --- a/tests/mutable-test/txn_dag/table_scan.test +++ b/tests/mutable-test/txn_dag/table_scan.test @@ -6,7 +6,7 @@ => DBGInvoke __set_flush_threshold(1000000, 1000000) -# Data +# Data. => DBGInvoke __mock_tidb_table(default, test, 'col_1 String') => DBGInvoke __refresh_schemas() => DBGInvoke __put_region(4, 0, 100, default, test) From 1372262f1866a54f178dc3d6da45697a92cc0fa6 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Tue, 13 Aug 2019 17:30:53 +0800 Subject: [PATCH 14/21] Mock sql dag compiler support more expression types and add filter test --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 99 ++++++++++++++++++++++---- dbms/src/Storages/Transaction/TiDB.h | 10 +++ tests/mutable-test/txn_dag/filter.test | 37 ++++++++++ 3 files changed, 133 insertions(+), 13 deletions(-) create mode 100644 tests/mutable-test/txn_dag/filter.test diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index e0fba0ec68c..0b2a959ce9e 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -109,7 +109,7 @@ struct ExecutorCtx }; void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::unordered_set & referred_columns, - std::unordered_map col_ref_map) + std::unordered_map & col_ref_map) { if (ASTIdentifier * id = typeid_cast(ast.get())) { @@ -122,20 +122,89 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un referred_columns.emplace((*ft).first); col_ref_map.emplace((*ft).first, expr); } - // else if (ASTFunction * func = typeid_cast(ast.get())) - // { - // } - else + else if (ASTFunction * func = typeid_cast(ast.get())) { - throw DB::Exception("Unsupported expression " + ast->getColumnName(), ErrorCodes::LOGICAL_ERROR); - } + // TODO: Support agg functions. + for (const auto & child_ast : func->arguments->children) + { + tipb::Expr * child = expr->add_children(); + compileExpr(input, child_ast, child, referred_columns, col_ref_map); + } - for (const auto & child_ast : ast->children) + String func_name_lowercase = Poco::toLower(func->name); + // TODO: Support more functions. + // TODO: Support type inference. + if (func_name_lowercase == "equals") + { + expr->set_sig(tipb::ScalarFuncSig::EQInt); + auto * ft = expr->mutable_field_type(); + // TODO: TiDB will infer Int64. + ft->set_tp(TiDB::TypeTiny); + ft->set_flag(TiDB::ColumnFlagUnsigned); + } + else if (func_name_lowercase == "and") + { + expr->set_sig(tipb::ScalarFuncSig::LogicalAnd); + auto * ft = expr->mutable_field_type(); + // TODO: TiDB will infer Int64. + ft->set_tp(TiDB::TypeTiny); + ft->set_flag(TiDB::ColumnFlagUnsigned); + } + else if (func_name_lowercase == "or") + { + expr->set_sig(tipb::ScalarFuncSig::LogicalOr); + auto * ft = expr->mutable_field_type(); + // TODO: TiDB will infer Int64. + ft->set_tp(TiDB::TypeTiny); + ft->set_flag(TiDB::ColumnFlagUnsigned); + } + else + { + throw DB::Exception("Unsupported function: " + func_name_lowercase, ErrorCodes::LOGICAL_ERROR); + } + expr->set_tp(tipb::ExprType::ScalarFunc); + } + else if (ASTLiteral * lit = typeid_cast(ast.get())) { - tipb::Expr * child = expr->add_children(); - compileExpr(input, child_ast, child, referred_columns, col_ref_map); + TiDB::CodecFlag codec_flag; + switch (lit->value.getType()) + { + case Field::Types::Which::Null: + expr->set_tp(tipb::Null); + codec_flag = TiDB::CodecFlagNil; + break; + case Field::Types::Which::UInt64: + expr->set_tp(tipb::Uint64); + codec_flag = TiDB::CodecFlagUInt; + break; + case Field::Types::Which::Int64: + expr->set_tp(tipb::Int64); + codec_flag = TiDB::CodecFlagInt; + break; + case Field::Types::Which::Float64: + expr->set_tp(tipb::Float64); + codec_flag = TiDB::CodecFlagFloat; + break; + case Field::Types::Which::Decimal: + expr->set_tp(tipb::MysqlDecimal); + codec_flag = TiDB::CodecFlagDecimal; + break; + case Field::Types::Which::String: + expr->set_tp(tipb::String); + codec_flag = TiDB::CodecFlagCompactBytes; + break; + default: + throw DB::Exception(String("Unsupported literal type: ") + lit->value.getTypeName(), ErrorCodes::LOGICAL_ERROR); + } + std::stringstream ss; + DB::EncodeDatum(lit->value, codec_flag, ss); + expr->set_val(ss.str()); } -}; + else + { + throw DB::Exception("Unsupported expression " + ast->getColumnName(), ErrorCodes::LOGICAL_ERROR); + } +} std::tuple compileQuery( Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts) @@ -172,7 +241,7 @@ std::tuple compileQuery( table_info = schema_fetcher(database_name, table_name); } - std::map executor_ctx_map; + std::unordered_map executor_ctx_map; std::unordered_set referred_columns; tipb::TableScan * ts = nullptr; tipb::Executor * last_executor = nullptr; @@ -312,8 +381,9 @@ std::tuple compileQuery( tipb::SelectResponse executeDAGRequest( Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version) { - Logger * log = &Logger::get("MockDAG"); + static Logger * log = &Logger::get("MockDAG"); LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString()); + context.setSetting("dag_planner", "optree"); tipb::SelectResponse dag_response; DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, dag_response, true); driver.execute(); @@ -323,6 +393,9 @@ tipb::SelectResponse executeDAGRequest( BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response) { + if (dag_response.has_error()) + throw DB::Exception(dag_response.error().msg(), dag_response.error().code()); + BlocksList blocks; for (const auto & chunk : dag_response.chunks()) { diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index cd6fc0651d9..dfde0a38a1e 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -97,6 +97,16 @@ enum TP M(PartKey, (1 << 14)) \ M(Num, (1 << 15)) +enum ColumnFlag +{ +#ifdef M +#error "Please undefine macro M first." +#endif +#define M(cf, v) ColumnFlag##cf = v, + COLUMN_FLAGS(M) +#undef M +}; + // Codec flags. // In format: TiDB codec flag, int value. #ifdef M diff --git a/tests/mutable-test/txn_dag/filter.test b/tests/mutable-test/txn_dag/filter.test new file mode 100644 index 00000000000..9045b9da1b4 --- /dev/null +++ b/tests/mutable-test/txn_dag/filter.test @@ -0,0 +1,37 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 777) + +# DAG read by not specifying region id, where col_1 = 666. +=> DBGInvoke dag('select * from default.test where col_2 = 666') +┌─col_1─┬─col_2─┐ +│ test1 │ 666 │ +└───────┴───────┘ + +# DAG read by explicitly specifying region id, where col_2 = 'test2'. +=> DBGInvoke dag('select col_2 from default.test where col_1 = \'test2\'', 4) +┌─col_2─┐ +│ 777 │ +└───────┘ + +# Mock DAG read, where or. +=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_1 = \'test2\' or col_2 = 666', 4) +┌─col_2─┬─col_1─┬─col_2─┐ +│ 666 │ test1 │ 666 │ +│ 777 │ test2 │ 777 │ +└───────┴───────┴───────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test From e2f9a024144540e0df71ea113d459b39a378d8bf Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Tue, 13 Aug 2019 18:39:28 +0800 Subject: [PATCH 15/21] Add topn and limit test --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 24 +++++++++++++++++---- tests/mutable-test/txn_dag/limit.test | 31 +++++++++++++++++++++++++++ tests/mutable-test/txn_dag/topn.test | 30 ++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 4 deletions(-) create mode 100644 tests/mutable-test/txn_dag/limit.test create mode 100644 tests/mutable-test/txn_dag/topn.test diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 0b2a959ce9e..3304f0afde1 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -284,19 +285,34 @@ std::tuple compileQuery( { tipb::Executor * topn_exec = dag_request.add_executors(); topn_exec->set_tp(tipb::ExecType::TypeTopN); - tipb::TopN * topN = topn_exec->mutable_topn(); + tipb::TopN * topn = topn_exec->mutable_topn(); std::unordered_map col_ref_map; for (const auto & child : ast_query.order_expression_list->children) { - tipb::ByItem * by = topN->add_order_by(); + ASTOrderByElement * elem = typeid_cast(child.get()); + if (!elem) + throw DB::Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR); + tipb::ByItem * by = topn->add_order_by(); + by->set_desc(elem->direction < 0); tipb::Expr * expr = by->mutable_expr(); - compileExpr(executor_ctx_map[last_executor].output, child, expr, referred_columns, col_ref_map); + compileExpr(executor_ctx_map[last_executor].output, elem->children[0], expr, referred_columns, col_ref_map); } auto limit = safeGet(typeid_cast(*ast_query.limit_length).value); - topN->set_limit(limit); + topn->set_limit(limit); executor_ctx_map.emplace(topn_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::move(col_ref_map)}); last_executor = topn_exec; } + else if (ast_query.limit_length) + { + tipb::Executor * limit_exec = dag_request.add_executors(); + limit_exec->set_tp(tipb::ExecType::TypeLimit); + tipb::Limit * limit = limit_exec->mutable_limit(); + auto limit_length = safeGet(typeid_cast(*ast_query.limit_length).value); + limit->set_limit(limit_length); + executor_ctx_map.emplace( + limit_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::unordered_map{}}); + last_executor = limit_exec; + } /// Aggregation. if (ast_query.group_expression_list) {} diff --git a/tests/mutable-test/txn_dag/limit.test b/tests/mutable-test/txn_dag/limit.test new file mode 100644 index 00000000000..ee8d97f75a7 --- /dev/null +++ b/tests/mutable-test/txn_dag/limit.test @@ -0,0 +1,31 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test1', 666) + +# DAG read by not specifying region id, order by col_2 limit 1. +=> DBGInvoke dag('select * from default.test') +┌─col_1─┬─col_2─┐ +│ test1 │ 666 │ +│ test1 │ 666 │ +└───────┴───────┘ + +# Mock DAG read, where + topn. +=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_2 = 666 limit 1', 4) +┌─col_2─┬─col_1─┬─col_2─┐ +│ 666 │ test1 │ 666 │ +└───────┴───────┴───────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_dag/topn.test b/tests/mutable-test/txn_dag/topn.test new file mode 100644 index 00000000000..1708402ca40 --- /dev/null +++ b/tests/mutable-test/txn_dag/topn.test @@ -0,0 +1,30 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 777) + +# DAG read by not specifying region id, order by col_2 limit 1. +=> DBGInvoke dag('select * from default.test order by col_2 limit 1') +┌─col_1─┬─col_2─┐ +│ test1 │ 666 │ +└───────┴───────┘ + +# Mock DAG read, where + topn. +=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_1 = \'test2\' or col_2 = 666 order by col_1 desc limit 1', 4) +┌─col_2─┬─col_1─┬─col_2─┐ +│ 777 │ test2 │ 777 │ +└───────┴───────┴───────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test From 8cea243df0c332a3cfcb3e387e4dbe20bd98e3ca Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Wed, 14 Aug 2019 01:08:10 +0800 Subject: [PATCH 16/21] Add agg for sql -> dag parser and agg test --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 142 +++++++++++++++----- tests/mutable-test/txn_dag/aggregation.test | 32 +++++ 2 files changed, 137 insertions(+), 37 deletions(-) create mode 100644 tests/mutable-test/txn_dag/aggregation.test diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 3304f0afde1..83baa92f10d 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -314,11 +315,111 @@ std::tuple compileQuery( last_executor = limit_exec; } - /// Aggregation. - if (ast_query.group_expression_list) {} - if (ast_query.select_expression_list /* select_expression_list has agg*/) {} + /// Column pruner. + std::function column_pruner = [&](ExecutorCtx & executor_ctx) { + if (!executor_ctx.input) + { + executor_ctx.output.erase(std::remove_if(executor_ctx.output.begin(), executor_ctx.output.end(), + [&](const auto & field) { return referred_columns.count(field.first) == 0; }), + executor_ctx.output.end()); + + for (const auto & field : executor_ctx.output) + { + tipb::ColumnInfo * ci = ts->add_columns(); + ci->set_column_id(table_info.getColumnID(field.first)); + ci->set_tp(field.second.tp()); + ci->set_flag(field.second.flag()); + ci->set_columnlen(field.second.flen()); + ci->set_decimal(field.second.decimal()); + } + + return; + } + column_pruner(executor_ctx_map[executor_ctx.input]); + const auto & last_output = executor_ctx_map[executor_ctx.input].output; + for (const auto & pair : executor_ctx.col_ref_map) + { + auto iter = std::find_if(last_output.begin(), last_output.end(), [&](const auto & field) { return field.first == pair.first; }); + if (iter == last_output.end()) + throw DB::Exception("Column not found when pruning: " + pair.first, ErrorCodes::LOGICAL_ERROR); + std::stringstream ss; + DB::EncodeNumber(iter - last_output.begin(), ss); + pair.second->set_val(ss.str()); + } + executor_ctx.output = last_output; + }; + + /// Aggregation finalize. + { + bool has_gby = ast_query.group_expression_list != nullptr; + bool has_agg_func = false; + for (const auto & child : ast_query.select_expression_list->children) + { + const ASTFunction * func = typeid_cast(child.get()); + if (func && AggregateFunctionFactory::instance().isAggregateFunctionName(func->name)) + { + has_agg_func = true; + break; + } + } + + if (has_gby || has_agg_func) + { + if (last_executor->has_limit() || last_executor->has_topn()) + throw DB::Exception("Limit/TopN and Agg cannot co-exist.", ErrorCodes::LOGICAL_ERROR); + + tipb::Executor * agg_exec = dag_request.add_executors(); + agg_exec->set_tp(tipb::ExecType::TypeAggregation); + tipb::Aggregation * agg = agg_exec->mutable_aggregation(); + std::unordered_map col_ref_map; + for (const auto & expr : ast_query.select_expression_list->children) + { + const ASTFunction * func = typeid_cast(expr.get()); + if (!func || !AggregateFunctionFactory::instance().isAggregateFunctionName(func->name)) + throw DB::Exception("Only agg function is allowed in select for a query with aggregation", ErrorCodes::LOGICAL_ERROR); + + tipb::Expr * agg_func = agg->add_agg_func(); + + for (const auto & arg : func->arguments->children) + { + tipb::Expr * arg_expr = agg_func->add_children(); + compileExpr(executor_ctx_map[last_executor].output, arg, arg_expr, referred_columns, col_ref_map); + } - /// Finalize. + if (func->name == "count") + { + agg_func->set_tp(tipb::Count); + auto ft = agg_func->mutable_field_type(); + ft->set_tp(TiDB::TypeLongLong); + ft->set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull); + } + // TODO: Other agg func. + else + { + throw DB::Exception("Unsupported agg function " + func->name, ErrorCodes::LOGICAL_ERROR); + } + + schema.emplace_back(std::make_pair(func->getColumnName(), agg_func->field_type())); + } + + if (has_gby) + { + for (const auto & child : ast_query.group_expression_list->children) + { + tipb::Expr * gby = agg->add_group_by(); + compileExpr(executor_ctx_map[last_executor].output, child, gby, referred_columns, col_ref_map); + schema.emplace_back(std::make_pair(child->getColumnName(), gby->field_type())); + } + } + + executor_ctx_map.emplace(agg_exec, ExecutorCtx{last_executor, DAGSchema{}, std::move(col_ref_map)}); + last_executor = agg_exec; + + column_pruner(executor_ctx_map[last_executor]); + } + } + + /// Non-aggregation finalize. if (!last_executor->has_aggregation()) { std::vector final_output; @@ -344,39 +445,6 @@ std::tuple compileQuery( } } - std::function column_pruner = [&](ExecutorCtx & executor_ctx) { - if (!executor_ctx.input) - { - executor_ctx.output.erase(std::remove_if(executor_ctx.output.begin(), executor_ctx.output.end(), - [&](const auto & field) { return referred_columns.count(field.first) == 0; }), - executor_ctx.output.end()); - - for (const auto & field : executor_ctx.output) - { - tipb::ColumnInfo * ci = ts->add_columns(); - ci->set_column_id(table_info.getColumnID(field.first)); - ci->set_tp(field.second.tp()); - ci->set_flag(field.second.flag()); - ci->set_columnlen(field.second.flen()); - ci->set_decimal(field.second.decimal()); - } - - return; - } - column_pruner(executor_ctx_map[executor_ctx.input]); - const auto & last_output = executor_ctx_map[executor_ctx.input].output; - for (const auto & pair : executor_ctx.col_ref_map) - { - auto iter - = std::find_if(last_output.begin(), last_output.end(), [&](const auto & field) { return field.first == pair.first; }); - if (iter == last_output.end()) - throw DB::Exception("Column not found when pruning: " + pair.first, ErrorCodes::LOGICAL_ERROR); - std::stringstream ss; - DB::EncodeNumber(iter - last_output.begin(), ss); - pair.second->set_val(ss.str()); - } - executor_ctx.output = last_output; - }; column_pruner(executor_ctx_map[last_executor]); const auto & last_output = executor_ctx_map[last_executor].output; diff --git a/tests/mutable-test/txn_dag/aggregation.test b/tests/mutable-test/txn_dag/aggregation.test new file mode 100644 index 00000000000..0f8ec4c30e3 --- /dev/null +++ b/tests/mutable-test/txn_dag/aggregation.test @@ -0,0 +1,32 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 52, 'test3', 777) + +# DAG read by not specifying region id, group by. +=> DBGInvoke dag('select count(col_1) from default.test group by col_2') +┌─count(col_1)─┬─col_2─┐ +│ 2 │ 666 │ +│ 1 │ 777 │ +└──────────────┴───────┘ + +# DAG read by explicitly specifying region id, where + group by. +=> DBGInvoke dag('select count(col_1) from default.test where col_2 = 666 group by col_2', 4) +┌─count(col_1)─┬─col_2─┐ +│ 2 │ 666 │ +└──────────────┴───────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test From 8fb4d528999f8faf0cf59793bcd38544b612abfd Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 15 Aug 2019 13:26:45 +0800 Subject: [PATCH 17/21] Add dag specific codec --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 20 ++++++++++---------- dbms/src/Flash/Coprocessor/DAGCodec.h | 23 +++++++++++++++++++++++ 2 files changed, 33 insertions(+), 10 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/DAGCodec.h diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 83baa92f10d..5287e5a97ba 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -168,38 +169,37 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un } else if (ASTLiteral * lit = typeid_cast(ast.get())) { - TiDB::CodecFlag codec_flag; + std::stringstream ss; switch (lit->value.getType()) { case Field::Types::Which::Null: expr->set_tp(tipb::Null); - codec_flag = TiDB::CodecFlagNil; + // Null literal epxr doesn't need value. break; case Field::Types::Which::UInt64: expr->set_tp(tipb::Uint64); - codec_flag = TiDB::CodecFlagUInt; + encodeDAGUInt64(lit->value.get(), ss); break; case Field::Types::Which::Int64: expr->set_tp(tipb::Int64); - codec_flag = TiDB::CodecFlagInt; + encodeDAGInt64(lit->value.get(), ss); break; case Field::Types::Which::Float64: expr->set_tp(tipb::Float64); - codec_flag = TiDB::CodecFlagFloat; + encodeDAGFloat64(lit->value.get(), ss); break; case Field::Types::Which::Decimal: expr->set_tp(tipb::MysqlDecimal); - codec_flag = TiDB::CodecFlagDecimal; + encodeDAGDecimal(lit->value.get(), ss); break; case Field::Types::Which::String: expr->set_tp(tipb::String); - codec_flag = TiDB::CodecFlagCompactBytes; + // TODO: Align with TiDB. + encodeDAGBytes(lit->value.get(), ss); break; default: throw DB::Exception(String("Unsupported literal type: ") + lit->value.getTypeName(), ErrorCodes::LOGICAL_ERROR); } - std::stringstream ss; - DB::EncodeDatum(lit->value, codec_flag, ss); expr->set_val(ss.str()); } else @@ -343,7 +343,7 @@ std::tuple compileQuery( if (iter == last_output.end()) throw DB::Exception("Column not found when pruning: " + pair.first, ErrorCodes::LOGICAL_ERROR); std::stringstream ss; - DB::EncodeNumber(iter - last_output.begin(), ss); + encodeDAGInt64(iter - last_output.begin(), ss); pair.second->set_val(ss.str()); } executor_ctx.output = last_output; diff --git a/dbms/src/Flash/Coprocessor/DAGCodec.h b/dbms/src/Flash/Coprocessor/DAGCodec.h new file mode 100644 index 00000000000..b44aeb77e59 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGCodec.h @@ -0,0 +1,23 @@ +#pragma once + +#include +#include + +namespace DB +{ + +void encodeDAGInt64(Int64, std::stringstream &); +void encodeDAGUInt64(UInt64, std::stringstream &); +void encodeDAGFloat64(Float64, std::stringstream &); +void encodeDAGBytes(const String &, std::stringstream &); +void encodeDAGCompactBytes(const String &, std::stringstream &); +void encodeDAGDecimal(const Decimal &, std::stringstream &); + +Int64 decodeDAGInt64(const String &); +UInt64 decodeDAGUInt64(const String &); +Float64 decodeDAGFloat64(const String &); +String decodeDAGBytes(const String &); +String decodeDAGCompactBytes(const String &); +Decimal decodeDAGDecimal(const String &); + +} // namespace DB \ No newline at end of file From 0c8e3a52a0cb516f2faa3620e2b8d5586585d41e Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 15 Aug 2019 13:43:49 +0800 Subject: [PATCH 18/21] type --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 5287e5a97ba..dbc3c9986f3 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -174,7 +174,7 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un { case Field::Types::Which::Null: expr->set_tp(tipb::Null); - // Null literal epxr doesn't need value. + // Null literal expr doesn't need value. break; case Field::Types::Which::UInt64: expr->set_tp(tipb::Uint64); From 41d2b4f6a002239a1f533d39d69a2eebdfbbeedd Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 15 Aug 2019 15:42:53 +0800 Subject: [PATCH 19/21] Update codec accordingly --- dbms/src/Flash/Coprocessor/DAGCodec.cpp | 65 +++++++++++++++++++++++++ dbms/src/Flash/Coprocessor/DAGCodec.h | 6 ++- dbms/src/Flash/Coprocessor/DAGUtils.cpp | 33 +++++++------ 3 files changed, 86 insertions(+), 18 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/DAGCodec.cpp diff --git a/dbms/src/Flash/Coprocessor/DAGCodec.cpp b/dbms/src/Flash/Coprocessor/DAGCodec.cpp new file mode 100644 index 00000000000..9d809cc1258 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGCodec.cpp @@ -0,0 +1,65 @@ +#include + +#include +#include + +namespace DB +{ + +void encodeDAGInt64(Int64 i, std::stringstream & ss) +{ + auto u = RecordKVFormat::encodeInt64(i); + ss.write(reinterpret_cast(&u), sizeof(u)); +} + +void encodeDAGUInt64(UInt64 i, std::stringstream & ss) +{ + auto u = RecordKVFormat::encodeUInt64(i); + ss.write(reinterpret_cast(&u), sizeof(u)); +} + +void encodeDAGFloat32(Float32 f, std::stringstream & ss) { EncodeFloat64(f, ss); } + +void encodeDAGFloat64(Float64 f, std::stringstream & ss) { EncodeFloat64(f, ss); } + +void encodeDAGString(const String & s, std::stringstream & ss) { ss << s; } + +void encodeDAGBytes(const String & bytes, std::stringstream & ss) { ss << bytes; } + +void encodeDAGDecimal(const Decimal & d, std::stringstream & ss) { EncodeDecimal(d, ss); } + +Int64 decodeDAGInt64(const String & s) +{ + auto u = *(reinterpret_cast(s.data())); + return RecordKVFormat::decodeInt64(u); +} + +UInt64 decodeDAGUInt64(const String & s) +{ + auto u = *(reinterpret_cast(s.data())); + return RecordKVFormat::decodeUInt64(u); +} + +Float32 decodeDAGFloat32(const String & s) +{ + size_t cursor = 0; + return DecodeFloat64(cursor, s); +} + +Float64 decodeDAGFloat64(const String & s) +{ + size_t cursor = 0; + return DecodeFloat64(cursor, s); +} + +String decodeDAGString(const String & s) { return s; } + +String decodeDAGBytes(const String & s) { return s; } + +Decimal decodeDAGDecimal(const String & s) +{ + size_t cursor = 0; + return DecodeDecimal(cursor, s); +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGCodec.h b/dbms/src/Flash/Coprocessor/DAGCodec.h index b44aeb77e59..faecf74df1f 100644 --- a/dbms/src/Flash/Coprocessor/DAGCodec.h +++ b/dbms/src/Flash/Coprocessor/DAGCodec.h @@ -8,16 +8,18 @@ namespace DB void encodeDAGInt64(Int64, std::stringstream &); void encodeDAGUInt64(UInt64, std::stringstream &); +void encodeDAGFloat32(Float32, std::stringstream &); void encodeDAGFloat64(Float64, std::stringstream &); +void encodeDAGString(const String &, std::stringstream &); void encodeDAGBytes(const String &, std::stringstream &); -void encodeDAGCompactBytes(const String &, std::stringstream &); void encodeDAGDecimal(const Decimal &, std::stringstream &); Int64 decodeDAGInt64(const String &); UInt64 decodeDAGUInt64(const String &); +Float32 decodeDAGFloat32(const String &); Float64 decodeDAGFloat64(const String &); +String decodeDAGString(const String &); String decodeDAGBytes(const String &); -String decodeDAGCompactBytes(const String &); Decimal decodeDAGDecimal(const String &); } // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index ee125ce3f97..79720f0b37b 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -1,9 +1,8 @@ #include #include +#include #include -#include -#include #include @@ -51,7 +50,6 @@ const String & getFunctionName(const tipb::Expr & expr) String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col, bool for_parser) { std::stringstream ss; - size_t cursor = 0; Int64 column_id = 0; String func_name; Field f; @@ -60,19 +58,21 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col case tipb::ExprType::Null: return "NULL"; case tipb::ExprType::Int64: - return std::to_string(RecordKVFormat::decodeInt64(RecordKVFormat::read(expr.val().data()))); + return std::to_string(decodeDAGInt64(expr.val())); case tipb::ExprType::Uint64: - return std::to_string(DecodeInt(cursor, expr.val())); + return std::to_string(decodeDAGUInt64(expr.val())); case tipb::ExprType::Float32: + return std::to_string(decodeDAGFloat32(expr.val())); case tipb::ExprType::Float64: - return std::to_string(DecodeFloat64(cursor, expr.val())); + return std::to_string(decodeDAGFloat64(expr.val())); case tipb::ExprType::String: + return decodeDAGString(expr.val()); case tipb::ExprType::Bytes: - return expr.val(); + return decodeDAGBytes(expr.val()); case tipb::ExprType::MysqlDecimal: - return DecodeDecimal(cursor, expr.val()).toString(); + return decodeDAGDecimal(expr.val()).toString(); case tipb::ExprType::ColumnRef: - column_id = RecordKVFormat::decodeInt64(RecordKVFormat::read(expr.val().data())); + column_id = decodeDAGInt64(expr.val()); if (column_id < 0 || column_id >= (ColumnID)input_col.size()) { throw Exception("Column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST); @@ -191,23 +191,24 @@ bool isColumnExpr(const tipb::Expr & expr) { return expr.tp() == tipb::ExprType: Field decodeLiteral(const tipb::Expr & expr) { - size_t cursor = 0; switch (expr.tp()) { case tipb::ExprType::Null: return Field(); case tipb::ExprType::Int64: - return RecordKVFormat::decodeInt64(RecordKVFormat::read(expr.val().data())); + return decodeDAGInt64(expr.val()); case tipb::ExprType::Uint64: - return DecodeInt(cursor, expr.val()); + return decodeDAGUInt64(expr.val()); case tipb::ExprType::Float32: + return Float64(decodeDAGFloat32(expr.val())); case tipb::ExprType::Float64: - return DecodeFloat64(cursor, expr.val()); + return decodeDAGFloat64(expr.val()); case tipb::ExprType::String: + return decodeDAGString(expr.val()); case tipb::ExprType::Bytes: - return expr.val(); + return decodeDAGBytes(expr.val()); case tipb::ExprType::MysqlDecimal: - return DecodeDecimal(cursor, expr.val()); + return decodeDAGDecimal(expr.val()); case tipb::ExprType::MysqlBit: case tipb::ExprType::MysqlDuration: case tipb::ExprType::MysqlEnum: @@ -224,7 +225,7 @@ Field decodeLiteral(const tipb::Expr & expr) ColumnID getColumnID(const tipb::Expr & expr) { - auto column_id = RecordKVFormat::decodeInt64(RecordKVFormat::read(expr.val().data())); + auto column_id = decodeDAGInt64(expr.val()); return column_id; } From 17111f59a72bbc160af9fd9eed337a72a7f9de77 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 15 Aug 2019 16:19:40 +0800 Subject: [PATCH 20/21] Remove cop-test --- .../Flash/Coprocessor/tests/CMakeLists.txt | 3 - dbms/src/Flash/Coprocessor/tests/cop_test.cpp | 332 ------------------ 2 files changed, 335 deletions(-) delete mode 100644 dbms/src/Flash/Coprocessor/tests/cop_test.cpp diff --git a/dbms/src/Flash/Coprocessor/tests/CMakeLists.txt b/dbms/src/Flash/Coprocessor/tests/CMakeLists.txt index c236d367c5d..b8e4b57cbca 100644 --- a/dbms/src/Flash/Coprocessor/tests/CMakeLists.txt +++ b/dbms/src/Flash/Coprocessor/tests/CMakeLists.txt @@ -1,4 +1 @@ include_directories (${CMAKE_CURRENT_BINARY_DIR}) - -add_executable (cop_test cop_test.cpp) -target_link_libraries (cop_test dbms) diff --git a/dbms/src/Flash/Coprocessor/tests/cop_test.cpp b/dbms/src/Flash/Coprocessor/tests/cop_test.cpp deleted file mode 100644 index 4babeececd4..00000000000 --- a/dbms/src/Flash/Coprocessor/tests/cop_test.cpp +++ /dev/null @@ -1,332 +0,0 @@ -#include -#include - -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" -#include -#include -#include -#include -#pragma GCC diagnostic pop - -#include - - -using ChannelPtr = std::shared_ptr; -using SubPtr = std::shared_ptr; -static const int DAGREQUEST = 103; -class FlashClient -{ -private: - SubPtr sp; - -public: - static std::string decodeDatumToString(size_t & cursor, const std::string & raw_data) - { - switch (raw_data[cursor++]) - { - case TiDB::CodecFlagNil: - return "NULL"; - case TiDB::CodecFlagInt: - return std::to_string(DB::DecodeInt(cursor, raw_data)); - case TiDB::CodecFlagUInt: - return std::to_string(DB::DecodeInt(cursor, raw_data)); - case TiDB::CodecFlagBytes: - return DB::DecodeBytes(cursor, raw_data); - case TiDB::CodecFlagCompactBytes: - return DB::DecodeCompactBytes(cursor, raw_data); - case TiDB::CodecFlagFloat: - return std::to_string(DB::DecodeFloat64(cursor, raw_data)); - case TiDB::CodecFlagVarUInt: - return std::to_string(DB::DecodeVarUInt(cursor, raw_data)); - case TiDB::CodecFlagVarInt: - return std::to_string(DB::DecodeVarInt(cursor, raw_data)); - case TiDB::CodecFlagDuration: - throw DB::Exception("Not implented yet. DecodeDatum: CodecFlagDuration"); - case TiDB::CodecFlagDecimal: - return DB::DecodeDecimal(cursor, raw_data).toString(); - default: - throw DB::Exception("Unknown Type:" + std::to_string(raw_data[cursor - 1])); - } - } - - FlashClient(ChannelPtr cp) : sp(tikvpb::Tikv::NewStub(cp)) {} - grpc::Status coprocessor(coprocessor::Request * rqst, size_t output_column_num) - { - grpc::ClientContext clientContext; - clientContext.AddMetadata("user_name", ""); - clientContext.AddMetadata("dag_planner", "optree"); - clientContext.AddMetadata("dag_expr_field_type_strict_check", "0"); - coprocessor::Response response; - grpc::Status status = sp->Coprocessor(&clientContext, *rqst, &response); - if (status.ok()) - { - // if status is ok, try to decode the result - tipb::SelectResponse selectResponse; - if (selectResponse.ParseFromString(response.data())) - { - if (selectResponse.has_error()) - { - std::cout << "Coprocessor request failed, error code " << selectResponse.error().code() << " error msg " - << selectResponse.error().msg(); - return status; - } - for (const tipb::Chunk & chunk : selectResponse.chunks()) - { - size_t cursor = 0; - const std::string & data = chunk.rows_data(); - while (cursor < data.size()) - { - for (size_t i = 0; i < output_column_num; i++) - { - std::cout << decodeDatumToString(cursor, data) << " "; - } - std::cout << std::endl; - } - } - std::cout << "Execute summary: " << std::endl; - for (int i = 0; i < selectResponse.execution_summaries_size(); i++) - { - auto & summary = selectResponse.execution_summaries(i); - std::cout << "Executor " << i; - std::cout << " time = " << summary.time_processed_ns() << " ns "; - std::cout << " rows = " << summary.num_produced_rows(); - std::cout << " iter nums = " << summary.num_iterations(); - std::cout << std::endl; - } - } - } - else - { - std::cout << "Coprocessor request failed, error code " << status.error_code() << " error msg " << status.error_message(); - } - return status; - } -}; - -using ClientPtr = std::shared_ptr; - -void appendTS(tipb::DAGRequest & dag_request, size_t & result_field_num) -{ - // table scan: s,i - tipb::Executor * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeTableScan); - tipb::TableScan * ts = executor->mutable_tbl_scan(); - ts->set_table_id(44); - tipb::ColumnInfo * ci = ts->add_columns(); - ci->set_column_id(1); - ci->set_tp(0xfe); - ci->set_flag(0); - ci = ts->add_columns(); - ci->set_column_id(2); - ci->set_tp(8); - ci->set_flag(0); - dag_request.add_output_offsets(1); - dag_request.add_output_offsets(0); - dag_request.add_output_offsets(1); - result_field_num = 3; -} - -void appendSelection(tipb::DAGRequest & dag_request) -{ - // selection: less(i, 123) - auto * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeSelection); - tipb::Selection * selection = executor->mutable_selection(); - tipb::Expr * expr = selection->add_conditions(); - expr->set_tp(tipb::ExprType::ScalarFunc); - expr->set_sig(tipb::ScalarFuncSig::LTInt); - tipb::Expr * col = expr->add_children(); - tipb::Expr * value = expr->add_children(); - col->set_tp(tipb::ExprType::ColumnRef); - std::stringstream ss; - DB::EncodeNumber(1, ss); - col->set_val(ss.str()); - auto * type = col->mutable_field_type(); - type->set_tp(8); - type->set_flag(0); - value->set_tp(tipb::ExprType::Int64); - ss.str(""); - DB::EncodeNumber(10, ss); - value->set_val(std::string(ss.str())); - type = value->mutable_field_type(); - type->set_tp(8); - type->set_flag(1); - type = expr->mutable_field_type(); - type->set_tp(1); - type->set_flag(1 << 5); - - // selection i in (5,10,11) - selection->clear_conditions(); - expr = selection->add_conditions(); - expr->set_tp(tipb::ExprType::ScalarFunc); - expr->set_sig(tipb::ScalarFuncSig::InInt); - col = expr->add_children(); - col->set_tp(tipb::ExprType::ColumnRef); - ss.str(""); - DB::EncodeNumber(1, ss); - col->set_val(ss.str()); - type = col->mutable_field_type(); - type->set_tp(8); - type->set_flag(0); - value = expr->add_children(); - value->set_tp(tipb::ExprType::Int64); - ss.str(""); - DB::EncodeNumber(10, ss); - value->set_val(std::string(ss.str())); - type = value->mutable_field_type(); - type->set_tp(8); - type->set_flag(1); - type = expr->mutable_field_type(); - type->set_tp(1); - type->set_flag(1 << 5); - value = expr->add_children(); - value->set_tp(tipb::ExprType::Int64); - ss.str(""); - DB::EncodeNumber(5, ss); - value->set_val(std::string(ss.str())); - type = value->mutable_field_type(); - type->set_tp(8); - type->set_flag(1); - type = expr->mutable_field_type(); - type->set_tp(1); - type->set_flag(1 << 5); - value = expr->add_children(); - value->set_tp(tipb::ExprType::Int64); - ss.str(""); - DB::EncodeNumber(11, ss); - value->set_val(std::string(ss.str())); - type = value->mutable_field_type(); - type->set_tp(8); - type->set_flag(1); - type = expr->mutable_field_type(); - type->set_tp(1); - type->set_flag(1 << 5); - - // selection i is null - /* - selection->clear_conditions(); - expr = selection->add_conditions(); - expr->set_tp(tipb::ExprType::ScalarFunc); - expr->set_sig(tipb::ScalarFuncSig::IntIsNull); - col = expr->add_children(); - col->set_tp(tipb::ExprType::ColumnRef); - ss.str(""); - DB::EncodeNumber(1, ss); - col->set_val(ss.str()); - */ -} - -void appendAgg(tipb::DAGRequest & dag_request, size_t & result_field_num) -{ - // agg: count(s) group by i; - auto * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeAggregation); - auto agg = executor->mutable_aggregation(); - auto agg_func = agg->add_agg_func(); - agg_func->set_tp(tipb::ExprType::Count); - auto child = agg_func->add_children(); - child->set_tp(tipb::ExprType::ColumnRef); - std::stringstream ss; - DB::EncodeNumber(0, ss); - child->set_val(ss.str()); - auto f_type = agg_func->mutable_field_type(); - f_type->set_tp(3); - f_type->set_flag(33); - auto group_col = agg->add_group_by(); - group_col->set_tp(tipb::ExprType::ColumnRef); - ss.str(""); - DB::EncodeNumber(1, ss); - group_col->set_val(ss.str()); - f_type = group_col->mutable_field_type(); - f_type->set_tp(8); - f_type->set_flag(1); - result_field_num = 2; -} - -void appendTopN(tipb::DAGRequest & dag_request) -{ - auto * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeTopN); - tipb::TopN * topN = executor->mutable_topn(); - topN->set_limit(3); - tipb::ByItem * byItem = topN->add_order_by(); - byItem->set_desc(false); - tipb::Expr * expr1 = byItem->mutable_expr(); - expr1->set_tp(tipb::ExprType::ColumnRef); - std::stringstream ss; - DB::EncodeNumber(1, ss); - expr1->set_val(ss.str()); - auto * type = expr1->mutable_field_type(); - type->set_tp(8); - type->set_tp(0); -} - -void appendLimit(tipb::DAGRequest & dag_request) -{ - auto * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeLimit); - tipb::Limit * limit = executor->mutable_limit(); - limit->set_limit(5); -} - -grpc::Status rpcTest() -{ - ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials()); - ClientPtr clientPtr = std::make_shared(cp); - size_t result_field_num = 0; - bool has_selection = true; - bool has_agg = false; - bool has_topN = true; - bool has_limit = false; - // construct a dag request - tipb::DAGRequest dagRequest; - dagRequest.set_start_ts(18446744073709551615uL); - - appendTS(dagRequest, result_field_num); - if (has_selection) - appendSelection(dagRequest); - if (has_agg) - appendAgg(dagRequest, result_field_num); - if (has_topN) - appendTopN(dagRequest); - if (has_limit) - appendLimit(dagRequest); - - // construct a coprocessor request - coprocessor::Request request; - //todo add context info - kvrpcpb::Context * ctx = request.mutable_context(); - ctx->set_region_id(2); - auto region_epoch = ctx->mutable_region_epoch(); - region_epoch->set_version(21); - region_epoch->set_conf_ver(2); - request.set_tp(DAGREQUEST); - request.set_data(dagRequest.SerializeAsString()); - //request.add_ranges(); - return clientPtr->coprocessor(&request, result_field_num); -} - -void codecTest() -{ - Int64 i = 123; - std::stringstream ss; - DB::EncodeNumber(i, ss); - std::string val = ss.str(); - std::stringstream decode_ss; - size_t cursor = 0; - DB::Field f = DB::DecodeDatum(cursor, val); - Int64 r = f.get(); - r++; -} - -int main() -{ - // std::cout << "Before rpcTest"<< std::endl; - grpc::Status ret = rpcTest(); - // codecTest(); - // std::cout << "End rpcTest " << std::endl; - // std::cout << "The ret is " << ret.error_code() << " " << ret.error_details() - // << " " << ret.error_message() << std::endl; - return 0; -} From d6eefa77a2cc5cf6e42c2813dedfc59977dd4c37 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Sun, 25 Aug 2019 03:00:31 +0800 Subject: [PATCH 21/21] Pass tests after merging master --- dbms/src/Debug/dbgFuncCoprocessor.cpp | 13 +- dbms/src/Debug/dbgFuncRegion.cpp | 5 +- dbms/src/Debug/dbgTools.cpp | 130 ++++++++++++- dbms/src/Debug/dbgTools.h | 2 + .../Coprocessor/DAGBlockOutputStream.cpp | 9 +- dbms/src/Flash/Coprocessor/DAGCodec.cpp | 4 +- dbms/src/Flash/Coprocessor/DAGCodec.h | 4 +- dbms/src/Flash/Coprocessor/DAGUtils.cpp | 14 +- dbms/src/Storages/Transaction/Codec.cpp | 77 ++++---- dbms/src/Storages/Transaction/Codec.h | 3 +- dbms/src/Storages/Transaction/Datum.cpp | 11 ++ .../src/Storages/Transaction/MyTimeParser.cpp | 172 ++++++++++++++++++ dbms/src/Storages/Transaction/MyTimeParser.h | 165 +---------------- dbms/src/Storages/Transaction/TiDB.cpp | 25 +-- dbms/src/Storages/Transaction/TiDB.h | 3 +- .../Storages/Transaction/TiKVRecordFormat.h | 15 -- 16 files changed, 396 insertions(+), 256 deletions(-) create mode 100644 dbms/src/Storages/Transaction/MyTimeParser.cpp diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 83828238803..594a0c4ee1c 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -49,8 +49,7 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args) region_id = safeGet(typeid_cast(*args[1]).value); Timestamp start_ts = context.getTMTContext().getPDClient()->getTS(); - auto [table_id, schema, dag_request] = compileQuery( - context, query, + auto [table_id, schema, dag_request] = compileQuery(context, query, [&](const String & database_name, const String & table_name) { auto storage = context.getTable(database_name, table_name); auto mmt = std::dynamic_pointer_cast(storage); @@ -92,8 +91,7 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args) if (start_ts == 0) start_ts = context.getTMTContext().getPDClient()->getTS(); - auto [table_id, schema, dag_request] = compileQuery( - context, query, + auto [table_id, schema, dag_request] = compileQuery(context, query, [&](const String & database_name, const String & table_name) { return MockTiDB::instance().getTableByName(database_name, table_name)->table_info; }, @@ -210,9 +208,12 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un expr->set_tp(tipb::Float64); encodeDAGFloat64(lit->value.get(), ss); break; - case Field::Types::Which::Decimal: + case Field::Types::Which::Decimal32: + case Field::Types::Which::Decimal64: + case Field::Types::Which::Decimal128: + case Field::Types::Which::Decimal256: expr->set_tp(tipb::MysqlDecimal); - encodeDAGDecimal(lit->value.get(), ss); + encodeDAGDecimal(lit->value, ss); break; case Field::Types::Which::String: expr->set_tp(tipb::String); diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index 1eefde4739a..257d7694d0a 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -140,7 +139,9 @@ void dbgFuncRegionSnapshotWithData(Context & context, const ASTs & args, DBGInvo } TiKVKey key = RecordKVFormat::genKey(table_id, handle_id); - TiKVValue value = RecordKVFormat::EncodeRow(table->table_info, fields); + std::stringstream ss; + RegionBench::encodeRow(table->table_info, fields, ss); + TiKVValue value(ss.str()); UInt64 commit_ts = tso; UInt64 prewrite_ts = tso; TiKVValue commit_value; diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index 5e40faabd9a..305ac3f8260 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -23,6 +24,9 @@ extern const int LOGICAL_ERROR; namespace RegionBench { +using TiDB::ColumnInfo; +using TiDB::TableInfo; + RegionPtr createRegion(TableID table_id, RegionID region_id, const HandleID & start, const HandleID & end) { enginepb::SnapshotRequest request; @@ -121,6 +125,128 @@ void addRequestsToRaftCmd(enginepb::CommandRequest * cmd, RegionID region_id, co } } +template +T convertNumber(const Field & field) +{ + switch (field.getType()) + { + case Field::Types::Int64: + return static_cast(field.get()); + case Field::Types::UInt64: + return static_cast(field.get()); + case Field::Types::Float64: + return static_cast(field.get()); + case Field::Types::Decimal32: + return static_cast(field.get>()); + case Field::Types::Decimal64: + return static_cast(field.get>()); + case Field::Types::Decimal128: + return static_cast(field.get>()); + case Field::Types::Decimal256: + return static_cast(field.get>()); + default: + throw Exception(String("Unable to convert field type ") + field.getTypeName() + " to number", ErrorCodes::LOGICAL_ERROR); + } +} + +Field convertDecimal(UInt32 scale, const Field & field) +{ + switch (field.getType()) + { + case Field::Types::Int64: + return DecimalField(ToDecimal(field.get(), scale), scale); + case Field::Types::UInt64: + return DecimalField(ToDecimal(field.get(), scale), scale); + case Field::Types::Float64: + return DecimalField(ToDecimal(field.get(), scale), scale); + case Field::Types::Decimal32: + case Field::Types::Decimal64: + case Field::Types::Decimal128: + case Field::Types::Decimal256: + return field; + default: + throw Exception(String("Unable to convert field type ") + field.getTypeName() + " to number", ErrorCodes::LOGICAL_ERROR); + } +} + +Field convertEnum(const ColumnInfo & column_info, const Field & field) +{ + switch (field.getType()) + { + case Field::Types::Int64: + case Field::Types::UInt64: + return convertNumber(field); + case Field::Types::String: + return static_cast(column_info.getEnumIndex(field.get())); + default: + throw Exception(String("Unable to convert field type ") + field.getTypeName() + " to Enum", ErrorCodes::LOGICAL_ERROR); + } +} + +Field convertField(const ColumnInfo & column_info, const Field & field) +{ + if (field.isNull()) + return field; + + switch (column_info.tp) + { + case TiDB::TypeTiny: + case TiDB::TypeShort: + case TiDB::TypeLong: + case TiDB::TypeLongLong: + case TiDB::TypeInt24: + case TiDB::TypeBit: + if (column_info.hasUnsignedFlag()) + return convertNumber(field); + else + return convertNumber(field); + case TiDB::TypeFloat: + case TiDB::TypeDouble: + return convertNumber(field); + case TiDB::TypeDate: + case TiDB::TypeDatetime: + case TiDB::TypeTimestamp: + return DB::parseMyDatetime(field.get()); + case TiDB::TypeVarchar: + case TiDB::TypeTinyBlob: + case TiDB::TypeMediumBlob: + case TiDB::TypeLongBlob: + case TiDB::TypeBlob: + case TiDB::TypeVarString: + case TiDB::TypeString: + return field; + case TiDB::TypeEnum: + return convertEnum(column_info, field); + case TiDB::TypeNull: + return Field(); + case TiDB::TypeDecimal: + case TiDB::TypeNewDecimal: + return convertDecimal(column_info.decimal, field); + case TiDB::TypeTime: + throw Exception(String("Unable to convert field type ") + field.getTypeName() + " to Time", ErrorCodes::LOGICAL_ERROR); + case TiDB::TypeYear: + throw Exception(String("Unable to convert field type ") + field.getTypeName() + " to Year", ErrorCodes::LOGICAL_ERROR); + case TiDB::TypeSet: + throw Exception(String("Unable to convert field type ") + field.getTypeName() + " to Set", ErrorCodes::LOGICAL_ERROR); + default: + return Field(); + } +} + +void encodeRow(const TiDB::TableInfo & table_info, const std::vector & fields, std::stringstream & ss) +{ + if (table_info.columns.size() != fields.size()) + throw Exception("Encoding row has different sizes between columns and values", ErrorCodes::LOGICAL_ERROR); + for (size_t i = 0; i < fields.size(); i++) + { + const TiDB::ColumnInfo & column_info = table_info.columns[i]; + EncodeDatum(Field(column_info.id), TiDB::CodecFlagInt, ss); + Field field = convertField(column_info, fields[i]); + TiDB::DatumBumpy datum = TiDB::DatumBumpy(field, column_info.tp); + EncodeDatum(datum.field(), column_info.getCodecFlag(), ss); + } +} + void insert(const TiDB::TableInfo & table_info, RegionID region_id, HandleID handle_id, ASTs::const_iterator begin, ASTs::const_iterator end, Context & context, const std::optional> & tso_del) { @@ -142,7 +268,9 @@ void insert(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han TableID table_id = RecordKVFormat::getTableId(region->getRange().first); TiKVKey key = RecordKVFormat::genKey(table_id, handle_id); - TiKVValue value = RecordKVFormat::EncodeRow(table_info, fields); + std::stringstream ss; + encodeRow(table_info, fields, ss); + TiKVValue value(ss.str()); UInt64 prewrite_ts = pd_client->getTS(); UInt64 commit_ts = pd_client->getTS(); diff --git a/dbms/src/Debug/dbgTools.h b/dbms/src/Debug/dbgTools.h index 5ed25a4eb02..70372a2c765 100644 --- a/dbms/src/Debug/dbgTools.h +++ b/dbms/src/Debug/dbgTools.h @@ -23,6 +23,8 @@ RegionPtr createRegion(TableID table_id, RegionID region_id, const HandleID & st Regions createRegions(TableID table_id, size_t region_num, size_t key_num_each_region, HandleID handle_begin, RegionID new_region_id_begin); +void encodeRow(const TiDB::TableInfo & table_info, const std::vector & fields, std::stringstream & ss); + void insert(const TiDB::TableInfo & table_info, RegionID region_id, HandleID handle_id, ASTs::const_iterator begin, ASTs::const_iterator end, Context & context, const std::optional> & tso_del = {}); diff --git a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp index 0ef25b08700..b475ac3cb24 100644 --- a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp +++ b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp @@ -2,6 +2,7 @@ #include #include +#include #include namespace DB @@ -13,6 +14,9 @@ extern const int UNSUPPORTED_PARAMETER; extern const int LOGICAL_ERROR; } // namespace ErrorCodes +using TiDB::DatumBumpy; +using TiDB::TP; + DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_, std::vector && result_field_types_, Block header_) : dag_response(dag_response_), @@ -71,8 +75,9 @@ void DAGBlockOutputStream::write(const Block & block) } for (size_t j = 0; j < block.columns(); j++) { - auto field = (*block.getByPosition(j).column.get())[i]; - EncodeDatum(field, getCodecFlagByFieldType(result_field_types[j]), current_ss); + const auto & field = (*block.getByPosition(j).column.get())[i]; + DatumBumpy datum(field, static_cast(result_field_types[j].tp())); + EncodeDatum(datum.field(), getCodecFlagByFieldType(result_field_types[j]), current_ss); } // Encode current row records_per_chunk++; diff --git a/dbms/src/Flash/Coprocessor/DAGCodec.cpp b/dbms/src/Flash/Coprocessor/DAGCodec.cpp index 9d809cc1258..2316cdcad99 100644 --- a/dbms/src/Flash/Coprocessor/DAGCodec.cpp +++ b/dbms/src/Flash/Coprocessor/DAGCodec.cpp @@ -26,7 +26,7 @@ void encodeDAGString(const String & s, std::stringstream & ss) { ss << s; } void encodeDAGBytes(const String & bytes, std::stringstream & ss) { ss << bytes; } -void encodeDAGDecimal(const Decimal & d, std::stringstream & ss) { EncodeDecimal(d, ss); } +void encodeDAGDecimal(const Field & field, std::stringstream & ss) { EncodeDecimal(field, ss); } Int64 decodeDAGInt64(const String & s) { @@ -56,7 +56,7 @@ String decodeDAGString(const String & s) { return s; } String decodeDAGBytes(const String & s) { return s; } -Decimal decodeDAGDecimal(const String & s) +Field decodeDAGDecimal(const String & s) { size_t cursor = 0; return DecodeDecimal(cursor, s); diff --git a/dbms/src/Flash/Coprocessor/DAGCodec.h b/dbms/src/Flash/Coprocessor/DAGCodec.h index faecf74df1f..44fb9e5bc3f 100644 --- a/dbms/src/Flash/Coprocessor/DAGCodec.h +++ b/dbms/src/Flash/Coprocessor/DAGCodec.h @@ -12,7 +12,7 @@ void encodeDAGFloat32(Float32, std::stringstream &); void encodeDAGFloat64(Float64, std::stringstream &); void encodeDAGString(const String &, std::stringstream &); void encodeDAGBytes(const String &, std::stringstream &); -void encodeDAGDecimal(const Decimal &, std::stringstream &); +void encodeDAGDecimal(const Field &, std::stringstream &); Int64 decodeDAGInt64(const String &); UInt64 decodeDAGUInt64(const String &); @@ -20,6 +20,6 @@ Float32 decodeDAGFloat32(const String &); Float64 decodeDAGFloat64(const String &); String decodeDAGString(const String &); String decodeDAGBytes(const String &); -Decimal decodeDAGDecimal(const String &); +Field decodeDAGDecimal(const String &); } // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index be1bbb92aa9..2a334230573 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -71,7 +71,19 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col case tipb::ExprType::Bytes: return decodeDAGBytes(expr.val()); case tipb::ExprType::MysqlDecimal: - return decodeDAGDecimal(expr.val()).toString(); + { + auto field = decodeDAGDecimal(expr.val()); + if (field.getType() == Field::Types::Decimal32) + return field.get>().toString(); + else if (field.getType() == Field::Types::Decimal64) + return field.get>().toString(); + else if (field.getType() == Field::Types::Decimal128) + return field.get>().toString(); + else if (field.getType() == Field::Types::Decimal256) + return field.get>().toString(); + else + throw Exception("Not decimal literal" + expr.DebugString(), ErrorCodes::COP_BAD_DAG_REQUEST); + } case tipb::ExprType::ColumnRef: column_id = decodeDAGInt64(expr.val()); if (column_id < 0 || column_id >= (ColumnID)input_col.size()) diff --git a/dbms/src/Storages/Transaction/Codec.cpp b/dbms/src/Storages/Transaction/Codec.cpp index 7465a04b6b5..d57c5023d77 100644 --- a/dbms/src/Storages/Transaction/Codec.cpp +++ b/dbms/src/Storages/Transaction/Codec.cpp @@ -410,7 +410,7 @@ inline void writeWord(String & buf, Int32 word, int size) } template -void EncodeDecimal(const T & dec, PrecType prec, ScaleType frac, std::stringstream & ss) +void EncodeDecimalImpl(const T & dec, PrecType prec, ScaleType frac, std::stringstream & ss) { static_assert(IsDecimal); @@ -476,19 +476,31 @@ void EncodeDecimal(const T & dec, PrecType prec, ScaleType frac, std::stringstre ss.write(buf.c_str(), buf.size()); } -template -inline T getFieldValue(const Field & field) +void EncodeDecimal(const Field & field, std::stringstream & ss) { - switch (field.getType()) + if (field.getType() == Field::Types::Decimal32) { - case Field::Types::UInt64: - return static_cast(field.get()); - case Field::Types::Int64: - return static_cast(field.get()); - case Field::Types::Float64: - return static_cast(field.get()); - default: - throw Exception("Unsupport (getFieldValue): " + std::string(field.getTypeName()), ErrorCodes::LOGICAL_ERROR); + auto decimal_field = field.get>(); + return EncodeDecimalImpl(decimal_field.getValue(), decimal_field.getPrec(), decimal_field.getScale(), ss); + } + else if (field.getType() == Field::Types::Decimal64) + { + auto decimal_field = field.get>(); + return EncodeDecimalImpl(decimal_field.getValue(), decimal_field.getPrec(), decimal_field.getScale(), ss); + } + else if (field.getType() == Field::Types::Decimal128) + { + auto decimal_field = field.get>(); + return EncodeDecimalImpl(decimal_field.getValue(), decimal_field.getPrec(), decimal_field.getScale(), ss); + } + else if (field.getType() == Field::Types::Decimal256) + { + auto decimal_field = field.get>(); + return EncodeDecimalImpl(decimal_field.getValue(), decimal_field.getPrec(), decimal_field.getScale(), ss); + } + else + { + throw Exception("Not a decimal when decoding decimal", ErrorCodes::LOGICAL_ERROR); } } @@ -503,46 +515,27 @@ void EncodeDatum(const Field & field, TiDB::CodecFlag flag, std::stringstream & switch (flag) { case TiDB::CodecFlagDecimal: - if (field.getType() == Field::Types::Decimal32) - { - auto decimal_field = field.get>(); - return EncodeDecimal(decimal_field.getValue(), decimal_field.getPrec(), decimal_field.getScale(), ss); - } - else if (field.getType() == Field::Types::Decimal64) - { - auto decimal_field = field.get>(); - return EncodeDecimal(decimal_field.getValue(), decimal_field.getPrec(), decimal_field.getScale(), ss); - } - else if (field.getType() == Field::Types::Decimal128) - { - auto decimal_field = field.get>(); - return EncodeDecimal(decimal_field.getValue(), decimal_field.getPrec(), decimal_field.getScale(), ss); - } - else - { - auto decimal_field = field.get>(); - return EncodeDecimal(decimal_field.getValue(), decimal_field.getPrec(), decimal_field.getScale(), ss); - } + return EncodeDecimal(field, ss); case TiDB::CodecFlagCompactBytes: - return EncodeCompactBytes(field.get(), ss); + return EncodeCompactBytes(field.safeGet(), ss); case TiDB::CodecFlagFloat: - return EncodeFloat64(getFieldValue(field), ss); + return EncodeFloat64(field.safeGet(), ss); case TiDB::CodecFlagUInt: - return EncodeUInt(getFieldValue(field), ss); + return EncodeUInt(field.safeGet(), ss); case TiDB::CodecFlagInt: - return EncodeInt64(getFieldValue(field), ss); + return EncodeInt64(field.safeGet(), ss); case TiDB::CodecFlagVarInt: - return EncodeVarInt(getFieldValue(field), ss); + return EncodeVarInt(field.safeGet(), ss); case TiDB::CodecFlagVarUInt: - return EncodeVarUInt(getFieldValue(field), ss); + return EncodeVarUInt(field.safeGet(), ss); default: throw Exception("Not implemented codec flag: " + std::to_string(flag), ErrorCodes::LOGICAL_ERROR); } } -template void EncodeDecimal(const Decimal32 &, PrecType, ScaleType, std::stringstream & ss); -template void EncodeDecimal(const Decimal64 &, PrecType, ScaleType, std::stringstream & ss); -template void EncodeDecimal(const Decimal128 &, PrecType, ScaleType, std::stringstream & ss); -template void EncodeDecimal(const Decimal256 &, PrecType, ScaleType, std::stringstream & ss); +template void EncodeDecimalImpl(const Decimal32 &, PrecType, ScaleType, std::stringstream & ss); +template void EncodeDecimalImpl(const Decimal64 &, PrecType, ScaleType, std::stringstream & ss); +template void EncodeDecimalImpl(const Decimal128 &, PrecType, ScaleType, std::stringstream & ss); +template void EncodeDecimalImpl(const Decimal256 &, PrecType, ScaleType, std::stringstream & ss); } // namespace DB diff --git a/dbms/src/Storages/Transaction/Codec.h b/dbms/src/Storages/Transaction/Codec.h index 53265ca4d20..38998fd04a0 100644 --- a/dbms/src/Storages/Transaction/Codec.h +++ b/dbms/src/Storages/Transaction/Codec.h @@ -62,8 +62,7 @@ void EncodeVarUInt(UInt64 num, std::stringstream & ss); void EncodeVarInt(Int64 num, std::stringstream & ss); -template -void EncodeDecimal(const T & dec, PrecType prec, ScaleType frac, std::stringstream & ss); +void EncodeDecimal(const Field & field, std::stringstream & ss); void EncodeDatum(const Field & field, TiDB::CodecFlag flag, std::stringstream & ss); diff --git a/dbms/src/Storages/Transaction/Datum.cpp b/dbms/src/Storages/Transaction/Datum.cpp index 8dc4b357c67..72301b9fa9b 100644 --- a/dbms/src/Storages/Transaction/Datum.cpp +++ b/dbms/src/Storages/Transaction/Datum.cpp @@ -135,6 +135,17 @@ struct DatumOp +struct DatumOp::type> +{ + static void unflatten(const Field & orig, std::optional & copy) { copy = static_cast(orig.get()); } + + static void flatten(const Field & orig, std::optional & copy) { copy = static_cast(orig.get()); } + + static bool overflow(const Field &, const ColumnInfo &) { return false; } +}; + DatumFlat::DatumFlat(const DB::Field & field, TP tp) : DatumBase(field, tp) { switch (tp) diff --git a/dbms/src/Storages/Transaction/MyTimeParser.cpp b/dbms/src/Storages/Transaction/MyTimeParser.cpp new file mode 100644 index 00000000000..e183e587ceb --- /dev/null +++ b/dbms/src/Storages/Transaction/MyTimeParser.cpp @@ -0,0 +1,172 @@ +#include + +#include +#include +#include +#include + +#include +#include + +namespace DB +{ + +int adjustYear(int year) +{ + if (year >= 0 && year <= 69) + return 2000 + year; + if (year >= 70 && year <= 99) + return 1900 + year; + return year; +} + +void scanTimeArgs(const std::vector & seps, std::initializer_list && list) +{ + int i = 0; + for (auto * ptr : list) + { + *ptr = std::stoi(seps[i]); + i++; + } +} + +int getFracIndex(const String & format) +{ + int idx = -1; + for (int i = int(format.size()) - 1; i >= 0; i--) + { + if (std::ispunct(format[i])) + { + if (format[i] == '.') + { + idx = i; + } + break; + } + } + return idx; +} + +std::vector parseDateFormat(String format) +{ + format = Poco::trimInPlace(format); + + std::vector seps; + size_t start = 0; + for (size_t i = 0; i < format.size(); i++) + { + if (i == 0 || i + 1 == format.size()) + { + if (!std::isdigit(format[i])) + return {}; + continue; + } + + if (!std::isdigit(format[i])) + { + if (!std::isdigit(format[i - 1])) + return {}; + seps.push_back(format.substr(start, i - start)); + start = i + 1; + } + } + seps.push_back(format.substr(start)); + return seps; +} + +std::vector splitDatetime(String format) +{ + int idx = getFracIndex(format); + if (idx > 0) + { + format = format.substr(0, idx); + } + return parseDateFormat(format); +} + +Field parseMyDatetime(const String & str) +{ + Int32 year = 0, month = 0, day = 0, hour = 0, minute = 0, second = 0; + + const auto & seps = splitDatetime(str); + + switch (seps.size()) + { + // No delimiter + case 1: + { + size_t l = seps[0].size(); + switch (l) + { + case 14: + // YYYYMMDDHHMMSS + { + std::sscanf(seps[0].c_str(), "%4d%2d%2d%2d%2d%2d", &year, &month, &day, &hour, &minute, &second); + break; + } + case 12: + { + std::sscanf(seps[0].c_str(), "%2d%2d%2d%2d%2d%2d", &year, &month, &day, &hour, &minute, &second); + year = adjustYear(year); + break; + } + case 11: + { + std::sscanf(seps[0].c_str(), "%2d%2d%2d%2d%2d%1d", &year, &month, &day, &hour, &minute, &second); + year = adjustYear(year); + break; + } + case 10: + { + std::sscanf(seps[0].c_str(), "%2d%2d%2d%2d%2d", &year, &month, &day, &hour, &minute); + year = adjustYear(year); + break; + } + case 9: + { + std::sscanf(seps[0].c_str(), "%2d%2d%2d%2d%1d", &year, &month, &day, &hour, &minute); + year = adjustYear(year); + break; + } + case 8: + { + std::sscanf(seps[0].c_str(), "%4d%2d%2d", &year, &month, &day); + break; + } + case 6: + case 5: + { + std::sscanf(seps[0].c_str(), "%2d%2d%2d", &year, &month, &day); + year = adjustYear(year); + break; + } + default: + { + throw Exception("Wrong datetime format"); + } + // TODO Process frac! + } + break; + } + case 3: + { + scanTimeArgs(seps, {&year, &month, &day}); + break; + } + case 6: + { + scanTimeArgs(seps, {&year, &month, &day, &hour, &minute, &second}); + break; + } + default: + { + throw Exception("Wrong datetime format"); + } + } + + UInt64 ymd = ((year * 13 + month) << 5) | day; + UInt64 hms = (hour << 12) | (minute << 6) | second; + return Field((ymd << 17 | hms) << 24); +} + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/MyTimeParser.h b/dbms/src/Storages/Transaction/MyTimeParser.h index 009bba29390..d58a4db258c 100644 --- a/dbms/src/Storages/Transaction/MyTimeParser.h +++ b/dbms/src/Storages/Transaction/MyTimeParser.h @@ -2,173 +2,10 @@ #include #include -#include -#include -#include -#include - -#include -#include namespace DB { -int adjustYear(int year) -{ - if (year >= 0 && year <= 69) - return 2000 + year; - if (year >= 70 && year <= 99) - return 1900 + year; - return year; -} - -void scanTimeArgs(const std::vector & seps, std::initializer_list && list) -{ - int i = 0; - for (auto * ptr : list) - { - *ptr = std::stoi(seps[i]); - i++; - } -} - -int getFracIndex(const String & format) -{ - int idx = -1; - for (int i = int(format.size()) - 1; i >= 0; i--) - { - if (std::ispunct(format[i])) - { - if (format[i] == '.') - { - idx = i; - } - break; - } - } - return idx; -} - -std::vector parseDateFormat(String format) -{ - format = Poco::trimInPlace(format); - - std::vector seps; - size_t start = 0; - for (size_t i = 0; i < format.size(); i++) - { - if (i == 0 || i + 1 == format.size()) - { - if (!std::isdigit(format[i])) - return {}; - continue; - } - - if (!std::isdigit(format[i])) - { - if (!std::isdigit(format[i - 1])) - return {}; - seps.push_back(format.substr(start, i - start)); - start = i + 1; - } - } - seps.push_back(format.substr(start)); - return seps; -} - -std::vector splitDatetime(String format) -{ - int idx = getFracIndex(format); - if (idx > 0) - { - format = format.substr(0, idx); - } - return parseDateFormat(format); -} - -Field parseMyDatetime(const String & str) -{ - Int32 year = 0, month = 0, day = 0, hour = 0, minute = 0, second = 0; - - const auto & seps = splitDatetime(str); - - switch (seps.size()) - { - // No delimiter - case 1: - { - size_t l = seps[0].size(); - switch (l) - { - case 14: - // YYYYMMDDHHMMSS - { - std::sscanf(seps[0].c_str(), "%4d%2d%2d%2d%2d%2d", &year, &month, &day, &hour, &minute, &second); - break; - } - case 12: - { - std::sscanf(seps[0].c_str(), "%2d%2d%2d%2d%2d%2d", &year, &month, &day, &hour, &minute, &second); - year = adjustYear(year); - break; - } - case 11: - { - std::sscanf(seps[0].c_str(), "%2d%2d%2d%2d%2d%1d", &year, &month, &day, &hour, &minute, &second); - year = adjustYear(year); - break; - } - case 10: - { - std::sscanf(seps[0].c_str(), "%2d%2d%2d%2d%2d", &year, &month, &day, &hour, &minute); - year = adjustYear(year); - break; - } - case 9: - { - std::sscanf(seps[0].c_str(), "%2d%2d%2d%2d%1d", &year, &month, &day, &hour, &minute); - year = adjustYear(year); - break; - } - case 8: - { - std::sscanf(seps[0].c_str(), "%4d%2d%2d", &year, &month, &day); - break; - } - case 6: - case 5: - { - std::sscanf(seps[0].c_str(), "%2d%2d%2d", &year, &month, &day); - year = adjustYear(year); - break; - } - default: - { - throw Exception("Wrong datetime format"); - } - // TODO Process frac! - } - break; - } - case 3: - { - scanTimeArgs(seps, {&year, &month, &day}); - break; - } - case 6: - { - scanTimeArgs(seps, {&year, &month, &day, &hour, &minute, &second}); - break; - } - default: - { - throw Exception("Wrong datetime format"); - } - } - - UInt64 ymd = ((year * 13 + month) << 5) | day; - UInt64 hms = (hour << 12) | (minute << 6) | second; - return Field((ymd << 17 | hms) << 24); -} +Field parseMyDatetime(const String & str); } // namespace DB diff --git a/dbms/src/Storages/Transaction/TiDB.cpp b/dbms/src/Storages/Transaction/TiDB.cpp index f1bd1d52d3e..bbdca6b2877 100644 --- a/dbms/src/Storages/Transaction/TiDB.cpp +++ b/dbms/src/Storages/Transaction/TiDB.cpp @@ -25,6 +25,7 @@ Field ColumnInfo::defaultValueToField() const } switch (tp) { + // TODO: Consider unsigned? // Integer Type. case TypeTiny: case TypeShort: @@ -157,8 +158,7 @@ catch (const Poco::Exception & e) std::string(__PRETTY_FUNCTION__) + ": Serialize TiDB schema JSON failed (ColumnInfo): " + e.displayText(), DB::Exception(e)); } -void ColumnInfo::deserialize(Poco::JSON::Object::Ptr json) -try +void ColumnInfo::deserialize(Poco::JSON::Object::Ptr json) try { id = json->getValue("id"); name = json->getObject("name")->getValue("L"); @@ -192,8 +192,7 @@ catch (const Poco::Exception & e) PartitionDefinition::PartitionDefinition(Poco::JSON::Object::Ptr json) { deserialize(json); } -Poco::JSON::Object::Ptr PartitionDefinition::getJSONObject() const -try +Poco::JSON::Object::Ptr PartitionDefinition::getJSONObject() const try { Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); json->set("id", id); @@ -214,8 +213,7 @@ catch (const Poco::Exception & e) std::string(__PRETTY_FUNCTION__) + ": Serialize TiDB schema JSON failed (PartitionDef): " + e.displayText(), DB::Exception(e)); } -void PartitionDefinition::deserialize(Poco::JSON::Object::Ptr json) -try +void PartitionDefinition::deserialize(Poco::JSON::Object::Ptr json) try { id = json->getValue("id"); name = json->getObject("name")->getValue("L"); @@ -230,8 +228,7 @@ catch (const Poco::Exception & e) PartitionInfo::PartitionInfo(Poco::JSON::Object::Ptr json) { deserialize(json); } -Poco::JSON::Object::Ptr PartitionInfo::getJSONObject() const -try +Poco::JSON::Object::Ptr PartitionInfo::getJSONObject() const try { Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); @@ -260,8 +257,7 @@ catch (const Poco::Exception & e) std::string(__PRETTY_FUNCTION__) + ": Serialize TiDB schema JSON failed (PartitionInfo): " + e.displayText(), DB::Exception(e)); } -void PartitionInfo::deserialize(Poco::JSON::Object::Ptr json) -try +void PartitionInfo::deserialize(Poco::JSON::Object::Ptr json) try { type = static_cast(json->getValue("type")); expr = json->getValue("expr"); @@ -285,8 +281,7 @@ catch (const Poco::Exception & e) TableInfo::TableInfo(const String & table_info_json) { deserialize(table_info_json); } -String TableInfo::serialize(bool escaped) const -try +String TableInfo::serialize(bool escaped) const try { std::stringstream buf; @@ -344,8 +339,7 @@ catch (const Poco::Exception & e) std::string(__PRETTY_FUNCTION__) + ": Serialize TiDB schema JSON failed (TableInfo): " + e.displayText(), DB::Exception(e)); } -void DBInfo::deserialize(const String & json_str) -try +void DBInfo::deserialize(const String & json_str) try { Poco::JSON::Parser parser; Poco::Dynamic::Var result = parser.parse(json_str); @@ -363,8 +357,7 @@ catch (const Poco::Exception & e) DB::Exception(e)); } -void TableInfo::deserialize(const String & json_str) -try +void TableInfo::deserialize(const String & json_str) try { if (json_str.empty()) { diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index 128f5ddd13a..efa0e8af5ed 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -179,12 +179,13 @@ struct ColumnInfo COLUMN_FLAGS(M) #undef M + DB::Field convertField(const DB::Field &) const; DB::Field defaultValueToField() const; + Int64 getEnumIndex(const String &) const; CodecFlag getCodecFlag() const; private: DB::Field getDecimalDefaultValue(const String & str) const; - Int64 getEnumIndex(const String &) const; }; enum PartitionType diff --git a/dbms/src/Storages/Transaction/TiKVRecordFormat.h b/dbms/src/Storages/Transaction/TiKVRecordFormat.h index 12b779be25e..5d27978f2f2 100644 --- a/dbms/src/Storages/Transaction/TiKVRecordFormat.h +++ b/dbms/src/Storages/Transaction/TiKVRecordFormat.h @@ -57,21 +57,6 @@ inline UInt64 decodeUInt64Desc(const UInt64 x) { return ~decodeUInt64(x); } inline Int64 decodeInt64(const UInt64 x) { return static_cast(decodeUInt64(x) ^ SIGN_MASK); } -inline TiKVValue EncodeRow(const TiDB::TableInfo & table_info, const std::vector & fields) -{ - if (table_info.columns.size() != fields.size()) - throw Exception("Encoding row has different sizes between columns and values", ErrorCodes::LOGICAL_ERROR); - std::stringstream ss; - for (size_t i = 0; i < fields.size(); i++) - { - const TiDB::ColumnInfo & column_info = table_info.columns[i]; - EncodeDatum(Field(column_info.id), TiDB::CodecFlagInt, ss); - TiDB::DatumBumpy datum = TiDB::DatumBumpy(fields[i], column_info.tp); - EncodeDatum(datum.field(), column_info.getCodecFlag(), ss); - } - return TiKVValue(ss.str()); -} - template inline T read(const char * s) {