From 26b6dd7925d3725f401abd093197c651cc148967 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Tue, 16 Jul 2019 17:20:42 +0800 Subject: [PATCH 01/23] Add sync schema on read --- dbms/src/Debug/MockSchemaSyncer.h | 2 +- .../Interpreters/InterpreterSelectQuery.cpp | 57 +++++++++++++------ .../src/Interpreters/InterpreterSelectQuery.h | 2 +- 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/dbms/src/Debug/MockSchemaSyncer.h b/dbms/src/Debug/MockSchemaSyncer.h index 5c2dd58b592..52f7d22ac5a 100644 --- a/dbms/src/Debug/MockSchemaSyncer.h +++ b/dbms/src/Debug/MockSchemaSyncer.h @@ -12,7 +12,7 @@ class MockSchemaSyncer : public SchemaSyncer bool syncSchemas(Context & context) override; - void syncSchema(Context & context, TableID table_id, bool) override; + void syncSchema(Context & context, TableID table_id, bool lock) override; TableID getTableIdByName(const std::string & database_name, const std::string & table_name) { diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index c312e00422a..2446554e3df 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -78,6 +78,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int NOT_IMPLEMENTED; extern const int SCHEMA_VERSION_ERROR; + extern const int UNKNOWN_EXCEPTION; } InterpreterSelectQuery::InterpreterSelectQuery( @@ -162,10 +163,7 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names) } if (storage) - table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); - - /// Make sure TMT storage schema qualifies the version specified by upper (TiDB or TiSpark). - alignStorageSchema(settings.schema_version); + table_lock = alignStorageSchemaAndLock(settings.schema_version); query_analyzer = std::make_unique( query_ptr, context, storage, source_columns, required_result_column_names, subquery_depth, !only_analyze); @@ -189,24 +187,51 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names) } -void InterpreterSelectQuery::alignStorageSchema(Int64 schema_version) +TableStructureReadLockPtr InterpreterSelectQuery::alignStorageSchemaAndLock(Int64 schema_version) { - if (schema_version == DEFAULT_SCHEMA_VERSION || !storage) - return; - + /// Regular read lock for non-TMT or DEFAULT_SCHEMA_VERSION specified. const auto merge_tree = dynamic_cast(storage.get()); - if (!merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn) - return; + if (schema_version == DEFAULT_SCHEMA_VERSION || !merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn) + return storage->lockStructure(false, __PRETTY_FUNCTION__); + + /// Lambda for schema version check under the read lock. + auto checkSchemaVersionAndLock = [&](bool schema_synced) -> std::tuple { + auto lock = storage->lockStructure(false, __PRETTY_FUNCTION__); + + auto storage_schema_version = merge_tree->getTableInfo().schema_version; + if (storage_schema_version > schema_version) + throw Exception("Storage schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR); + + if ((schema_synced && storage_schema_version <= schema_version) || (!schema_synced && storage_schema_version == schema_version)) + return std::make_tuple(lock, storage_schema_version); + + return std::make_tuple(nullptr, storage_schema_version); + }; - auto storage_schema_version = merge_tree->getTableInfo().schema_version; - if (storage_schema_version < schema_version) + /// Try check and lock once. { - LOG_TRACE(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", syncing schema."); - context.getTMTContext().getSchemaSyncer()->syncSchema(context, merge_tree->getTableInfo().id, false); + auto [lock, storage_schema_version] = checkSchemaVersionAndLock(false); + if (lock) + { + LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check OK, no syncing required."); + return lock; + } + LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check not OK."); } - if (storage_schema_version > schema_version) - throw Exception("Storage schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR); + /// If first try failed, sync schema and check again. + { + context.getTMTContext().getSchemaSyncer()->syncSchemas(context); + + auto [lock, storage_schema_version] = checkSchemaVersionAndLock(true); + if (lock) + { + LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check OK after syncing."); + return lock; + } + + throw Exception("Shouldn't reach here", ErrorCodes::UNKNOWN_EXCEPTION); + } } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 24ca7ce350a..c0b007690d0 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -111,7 +111,7 @@ class InterpreterSelectQuery : public IInterpreter void init(const Names & required_result_column_names); - void alignStorageSchema(Int64 schema_version); + TableStructureReadLockPtr alignStorageSchemaAndLock(Int64 schema_version); void executeImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run); From d1ac4f42143680cbba7376bb9f03a0b57d6095b7 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Tue, 16 Jul 2019 19:52:35 +0800 Subject: [PATCH 02/23] Simplify schema syncer interface and adjust mock stuff --- dbms/src/Debug/DBGInvoker.cpp | 3 +- dbms/src/Debug/MockSchemaSyncer.cpp | 55 ++++++----- dbms/src/Debug/MockSchemaSyncer.h | 13 +-- dbms/src/Debug/MockTiDB.cpp | 34 ++----- dbms/src/Debug/MockTiDB.h | 6 +- dbms/src/Debug/dbgFuncSchema.cpp | 92 +++++-------------- dbms/src/Debug/dbgFuncSchema.h | 11 ++- dbms/src/Interpreters/Context.cpp | 4 +- dbms/src/Interpreters/Context.h | 2 +- .../Storages/Transaction/SchemaSyncService.h | 2 + dbms/src/Storages/Transaction/SchemaSyncer.h | 9 -- .../Storages/Transaction/TiDBSchemaSyncer.h | 2 - 12 files changed, 78 insertions(+), 155 deletions(-) diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index 0f7b97c11bb..ce80ba28ab1 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -59,8 +59,9 @@ DBGInvoker::DBGInvoker() regFunc("dump_all_region", dbgFuncDumpAllRegion); + regFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService); regFunc("mock_schema_syncer", dbgFuncMockSchemaSyncer); - regFunc("refresh_schema", dbgFuncRefreshSchema); + regFunc("refresh_schemas", dbgFuncRefreshSchemas); } void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement) diff --git a/dbms/src/Debug/MockSchemaSyncer.cpp b/dbms/src/Debug/MockSchemaSyncer.cpp index df41fd98270..116d6295127 100644 --- a/dbms/src/Debug/MockSchemaSyncer.cpp +++ b/dbms/src/Debug/MockSchemaSyncer.cpp @@ -235,31 +235,36 @@ AlterCommands detectSchemaChanges(const TableInfo & table_info, const TableInfo MockSchemaSyncer::MockSchemaSyncer() : log(&Logger::get("MockSchemaSyncer")) {} -bool MockSchemaSyncer::syncSchemas(Context & /*context*/) +bool MockSchemaSyncer::syncSchemas(Context & context) { - // Don't do full schema sync, we want to test schema sync timing in a fine-grained fashion. - return false; + std::unordered_map new_tables; + MockTiDB::instance().traverseTables([&](const auto & table) { new_tables.emplace(table->id(), table); }); + + for (auto [id, table] : tables) + { + if (new_tables.find(id) == new_tables.end()) + dropTable(table->table_info.db_name, table->table_info.name, context); + } + + for (auto [id, table] : new_tables) + { + std::ignore = id; + syncTable(context, table); + } + + tables.swap(new_tables); + + return true; } -void MockSchemaSyncer::syncSchema(Context & context, TableID table_id, bool /*lock*/) +void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table) { auto & tmt_context = context.getTMTContext(); - /// Get table schema json from TiDB/TiKV. - String table_info_json = getSchemaJson(table_id, context); - if (table_info_json.empty()) - { - /// Table dropped. - auto storage = tmt_context.getStorages().get(table_id); - if (storage == nullptr) - { - LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << "doesn't exist in TiDB and doesn't exist in TMT, do nothing."); - return; - } - LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << "doesn't exist in TiDB, dropping."); - dropTable(storage->getDatabaseName(), storage->getTableName(), context); - return; - } + /// Get table schema json. + auto table_id = table->id(); + + String table_info_json = table->table_info.serialize(false); LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << " info json: " << table_info_json); @@ -326,8 +331,7 @@ void MockSchemaSyncer::syncSchema(Context & context, TableID table_id, bool /*lo } /// Table existing, detect schema changes and apply. - auto merge_tree = std::dynamic_pointer_cast(storage); - const TableInfo & orig_table_info = merge_tree->getTableInfo(); + const TableInfo & orig_table_info = storage->getTableInfo(); AlterCommands alter_commands = detectSchemaChanges(table_info, orig_table_info); std::stringstream ss; @@ -345,15 +349,8 @@ void MockSchemaSyncer::syncSchema(Context & context, TableID table_id, bool /*lo LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": " << ss.str()); - { - // Change internal TableInfo in TMT first. - // TODO: Ideally this should be done within alter function, however we are limited by the narrow alter interface, thus not truly atomic. - auto table_hard_lock = storage->lockStructureForAlter(__PRETTY_FUNCTION__); - merge_tree->setTableInfo(table_info); - } - // Call storage alter to apply schema changes. - storage->alter(alter_commands, table_info.db_name, table_info.name, context); + storage->alterForTMT(alter_commands, table_info, context); LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Schema changes apply done."); diff --git a/dbms/src/Debug/MockSchemaSyncer.h b/dbms/src/Debug/MockSchemaSyncer.h index 52f7d22ac5a..4139f02c425 100644 --- a/dbms/src/Debug/MockSchemaSyncer.h +++ b/dbms/src/Debug/MockSchemaSyncer.h @@ -2,6 +2,8 @@ #include +#include + namespace DB { @@ -12,17 +14,12 @@ class MockSchemaSyncer : public SchemaSyncer bool syncSchemas(Context & context) override; - void syncSchema(Context & context, TableID table_id, bool lock) override; - - TableID getTableIdByName(const std::string & database_name, const std::string & table_name) - { - return MockTiDB::instance().getTableIDByName(database_name, table_name); - } - protected: - String getSchemaJson(TableID table_id, Context & /*context*/) { return MockTiDB::instance().getSchemaJson(table_id); } + void syncTable(Context & context, MockTiDB::TablePtr table); Logger * log; + + std::unordered_map tables; }; } // namespace DB diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 9b937fc1fdc..5ec28ba2598 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -26,33 +26,6 @@ Table::Table(const String & database_name_, const String & table_name_, TableInf : table_info(std::move(table_info_)), database_name(database_name_), table_name(table_name_) {} -String MockTiDB::getSchemaJson(TableID table_id) -{ - std::lock_guard lock(tables_mutex); - - auto it = tables_by_id.find(table_id); - if (it == tables_by_id.end()) - { - return ""; - } - - return it->second->table_info.serialize(false); -} - -TableID MockTiDB::getTableIDByName(const std::string & database_name, const std::string & table_name) -{ - std::lock_guard lock(tables_mutex); - - String qualified_name = database_name + "." + table_name; - auto it = tables_by_name.find(qualified_name); - if (it == tables_by_name.end()) - { - return InvalidTableID; - } - - return it->second->table_info.id; -} - void MockTiDB::dropTable(const String & database_name, const String & table_name) { std::lock_guard lock(tables_mutex); @@ -251,6 +224,13 @@ TablePtr MockTiDB::getTableByName(const String & database_name, const String & t return getTableByNameInternal(database_name, table_name); } +void MockTiDB::traverseTables(std::function f) +{ + std::lock_guard lock(tables_mutex); + + std::for_each(tables_by_id.begin(), tables_by_id.end(), [&](const auto & pair) { f(pair.second); }); +} + TablePtr MockTiDB::getTableByNameInternal(const String & database_name, const String & table_name) { String qualified_name = database_name + "." + table_name; diff --git a/dbms/src/Debug/MockTiDB.h b/dbms/src/Debug/MockTiDB.h index 50bd28d7a15..5adc0dce5eb 100644 --- a/dbms/src/Debug/MockTiDB.h +++ b/dbms/src/Debug/MockTiDB.h @@ -62,10 +62,6 @@ class MockTiDB : public ext::singleton using TablePtr = std::shared_ptr; public: - String getSchemaJson(TableID table_id); - - TableID getTableIDByName(const std::string & database_name, const std::string & table_name); - TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns); TableID newPartition(const String & database_name, const String & table_name, const String & partition_name); @@ -80,6 +76,8 @@ class MockTiDB : public ext::singleton TablePtr getTableByName(const String & database_name, const String & table_name); + void traverseTables(std::function f); + private: TablePtr getTableByNameInternal(const String & database_name, const String & table_name); diff --git a/dbms/src/Debug/dbgFuncSchema.cpp b/dbms/src/Debug/dbgFuncSchema.cpp index 731ac90899c..dc45d067e76 100644 --- a/dbms/src/Debug/dbgFuncSchema.cpp +++ b/dbms/src/Debug/dbgFuncSchema.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -15,6 +16,23 @@ namespace ErrorCodes extern const int UNKNOWN_TABLE; } // namespace ErrorCodes +void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInvoker::Printer output) +{ + if (args.size() != 1) + throw Exception("Args not matched, should be: enable (true/false)", ErrorCodes::BAD_ARGUMENTS); + + bool enable = safeGet(typeid_cast(*args[0]).value) == "true"; + + if (enable) + context.initializeSchemaSyncService(); + else + context.getSchemaSyncService().reset(); + + std::stringstream ss; + ss << "schema sync service " << (enable ? "enabled" : "disabled"); + output(ss.str()); +} + void dbgFuncMockSchemaSyncer(Context & context, const ASTs & args, DBGInvoker::Printer output) { if (args.size() != 1) @@ -39,79 +57,15 @@ void dbgFuncMockSchemaSyncer(Context & context, const ASTs & args, DBGInvoker::P output(ss.str()); } -void dbgFuncRefreshSchema(Context & context, const ASTs & args, DBGInvoker::Printer output) +void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer output) { - if (args.size() != 2) - throw Exception("Args not matched, should be: database-name, table-name", ErrorCodes::BAD_ARGUMENTS); - - std::string database_name = typeid_cast(*args[0]).name; - std::transform(database_name.begin(), database_name.end(), database_name.begin(), ::tolower); - std::string table_name = typeid_cast(*args[1]).name; - std::transform(table_name.begin(), table_name.end(), table_name.begin(), ::tolower); - - auto log = [&](TableID table_id) { - std::stringstream ss; - ss << "refreshed schema for table #" << table_id; - output(ss.str()); - }; - TMTContext & tmt = context.getTMTContext(); auto schema_syncer = tmt.getSchemaSyncer(); - auto mock_schema_syncer = std::dynamic_pointer_cast(schema_syncer); - if (!mock_schema_syncer) - throw Exception("Debug function refresh_schema can only be used under mock schema syncer."); - - TableID table_id = mock_schema_syncer->getTableIdByName(database_name, table_name); - auto storage = tmt.getStorages().getByName(database_name, table_name); - - if (storage == nullptr && table_id == InvalidTableID) - // Table does not exist in CH nor TiDB, error out. - throw Exception("Table " + database_name + "." + table_name + " doesn't exist in tidb", ErrorCodes::UNKNOWN_TABLE); - - if (storage == nullptr && table_id != InvalidTableID) - { - // Table does not exist in CH, but exists in TiDB. - // Might be renamed or never synced. - // Note there will be a dangling table in CH for the following scenario: - // Table t was synced to CH already, then t was renamed (name changed) and truncated (ID changed). - // Then this function was called with the new name given, the table will be synced to a new table. - // User must manually call this function with the old name to remove the dangling table in CH. - mock_schema_syncer->syncSchema(context, table_id, true); - - log(table_id); - - return; - } - - if (table_id == InvalidTableID) - { - // Table exists in CH, but does not exist in TiDB. - // Just sync it using the storage's ID, syncer will then remove it. - mock_schema_syncer->syncSchema(context, storage->getTableInfo().id, true); - - log(table_id); - - return; - } + schema_syncer->syncSchemas(context); - // Table exists in both CH and TiDB. - if (table_id != storage->getTableInfo().id) - { - // Table in TiDB is not the old one, i.e. dropped/renamed then recreated. - // Sync the old one in CH first, then sync the new one. - mock_schema_syncer->syncSchema(context, storage->getTableInfo().id, true); - mock_schema_syncer->syncSchema(context, table_id, true); - - log(table_id); - - return; - } - - // Table in TiDB is the same one as in CH. - // Just sync it. - mock_schema_syncer->syncSchema(context, table_id, true); - - log(table_id); + std::stringstream ss; + ss << "schemas refreshed"; + output(ss.str()); } } // namespace DB diff --git a/dbms/src/Debug/dbgFuncSchema.h b/dbms/src/Debug/dbgFuncSchema.h index 10c56bcf570..9e15ad88ef3 100644 --- a/dbms/src/Debug/dbgFuncSchema.h +++ b/dbms/src/Debug/dbgFuncSchema.h @@ -8,14 +8,19 @@ namespace DB class Context; +// Enable/disable schema sync service. +// Usage: +// ./storages-client.sh "DBGInvoke enable_schema_sync_service(enable)" +void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInvoker::Printer output); + // Change whether to mock schema syncer. // Usage: // ./storages-client.sh "DBGInvoke mock_schema_syncer(enabled)" void dbgFuncMockSchemaSyncer(Context & context, const ASTs & args, DBGInvoker::Printer output); -// Refresh schema of the given table. +// Refresh schemas for all tables. // Usage: -// ./storage-client.sh "DBGInvoke refresh_schema(database_name, table_name)" -void dbgFuncRefreshSchema(Context & context, const ASTs & args, DBGInvoker::Printer output); +// ./storage-client.sh "DBGInvoke refresh_schemas()" +void dbgFuncRefreshSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output); } // namespace DB diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index d4ed73d947e..141d611e7bf 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1450,12 +1450,12 @@ void Context::initializeSchemaSyncService() shared->schema_sync_service = std::make_shared(*this); } -SchemaSyncService & Context::getSchemaSyncService() +SchemaSyncServicePtr Context::getSchemaSyncService() { auto lock = getLock(); if (!shared->schema_sync_service) throw Exception("Schema Sync Service is not initialized.", ErrorCodes::LOGICAL_ERROR); - return *shared->schema_sync_service; + return shared->schema_sync_service; } zkutil::ZooKeeperPtr Context::getZooKeeper() const diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index c4c53b43d41..4b7402c1a6e 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -369,7 +369,7 @@ class Context TiDBService & getTiDBService(); void initializeSchemaSyncService(); - SchemaSyncService & getSchemaSyncService(); + SchemaSyncServicePtr getSchemaSyncService(); Clusters & getClusters() const; std::shared_ptr getCluster(const std::string & cluster_name) const; diff --git a/dbms/src/Storages/Transaction/SchemaSyncService.h b/dbms/src/Storages/Transaction/SchemaSyncService.h index a846f0bf6c5..6a731a98cfb 100644 --- a/dbms/src/Storages/Transaction/SchemaSyncService.h +++ b/dbms/src/Storages/Transaction/SchemaSyncService.h @@ -28,4 +28,6 @@ class SchemaSyncService : public std::enable_shared_from_this Logger * log; }; +using SchemaSyncServicePtr = std::shared_ptr; + } // namespace DB diff --git a/dbms/src/Storages/Transaction/SchemaSyncer.h b/dbms/src/Storages/Transaction/SchemaSyncer.h index 892a5a85839..461c5813853 100644 --- a/dbms/src/Storages/Transaction/SchemaSyncer.h +++ b/dbms/src/Storages/Transaction/SchemaSyncer.h @@ -20,15 +20,6 @@ class SchemaSyncer * @param context */ virtual bool syncSchemas(Context & context) = 0; - - /** - * Synchronize schema between TiDB and CH, to make sure the CH table is new enough to accept data from raft. - * Should be stateless. - * Nevertheless, the implementations may assume that the storage is appropriately locked, thus still not thread-safe. - * @param context - * @param table_id - */ - virtual void syncSchema(Context & context, TableID table_id, bool lock) = 0; }; using SchemaSyncerPtr = std::shared_ptr; diff --git a/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h b/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h index 79a663b9683..cf6fbb1304c 100644 --- a/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h +++ b/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h @@ -52,8 +52,6 @@ struct TiDBSchemaSyncer : public SchemaSyncer return true; } - void syncSchema(Context & context, TableID, bool) override { syncSchemas(context); } - bool tryLoadSchemaDiffs(SchemaGetter & getter, Int64 version, Context & context) { if (isTooOldSchema(cur_version, version)) From d3e2298dd8b98e054545f4fcafcf7aa712cc33b0 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 18 Jul 2019 13:08:55 +0800 Subject: [PATCH 03/23] Rename default schema version setting --- dbms/src/Core/Defines.h | 2 +- dbms/src/Interpreters/Settings.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index 48fe68de6b3..85eb7d499d1 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -27,7 +27,7 @@ #define DEFAULT_MAX_COMPRESS_BLOCK_SIZE 1048576 #define DEFAULT_MAX_READ_TSO 0xFFFFFFFFFFFFFFFF -#define DEFAULT_SCHEMA_VERSION -1 +#define DEFAULT_UNSPECIFIED_SCHEMA_VERSION -1 /** Which blocks by default read the data (by number of rows). * Smaller values give better cache locality, less consumption of RAM, but more overhead to process the query. diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index b6077e0879e..c67999e6916 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -29,7 +29,7 @@ struct Settings M(SettingString, regions, "", "the region need to be read.") \ M(SettingBool, resolve_locks, false, "tmt read tso.") \ M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \ - M(SettingInt64, schema_version, DEFAULT_SCHEMA_VERSION, "tmt schema version.") \ + M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \ M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \ M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \ M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \ From e690046f1ba4ad7c2f457c061193c7a08f8bfcf8 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 18 Jul 2019 13:15:45 +0800 Subject: [PATCH 04/23] Compensate last commit --- dbms/src/Interpreters/InterpreterSelectQuery.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 2446554e3df..b63950e5887 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -189,9 +189,9 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names) TableStructureReadLockPtr InterpreterSelectQuery::alignStorageSchemaAndLock(Int64 schema_version) { - /// Regular read lock for non-TMT or DEFAULT_SCHEMA_VERSION specified. + /// Regular read lock for non-TMT or schema version unspecified. const auto merge_tree = dynamic_cast(storage.get()); - if (schema_version == DEFAULT_SCHEMA_VERSION || !merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn) + if (schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION || !merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn) return storage->lockStructure(false, __PRETTY_FUNCTION__); /// Lambda for schema version check under the read lock. From 49b3fdf31db1192ea688b84a5d45e28855eb4e8a Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 18 Jul 2019 19:08:01 +0800 Subject: [PATCH 05/23] Remove curl library --- CMakeLists.txt | 1 - cmake/find_curl.cmake | 7 ------- 2 files changed, 8 deletions(-) delete mode 100644 cmake/find_curl.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index 5628975d5d6..49971669148 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -265,7 +265,6 @@ include (cmake/find_capnp.cmake) include (cmake/find_llvm.cmake) include (cmake/find_grpc.cmake) include (cmake/find_kvproto.cmake) -include (cmake/find_curl.cmake) include (cmake/find_contrib_lib.cmake) diff --git a/cmake/find_curl.cmake b/cmake/find_curl.cmake deleted file mode 100644 index 74217cbc7cb..00000000000 --- a/cmake/find_curl.cmake +++ /dev/null @@ -1,7 +0,0 @@ -find_package (CURL REQUIRED) - -if (NOT CURL_FOUND) - message (FATAL_ERROR "Curl Not Found!") -endif (NOT CURL_FOUND) - -message (STATUS "Using CURL: ${CURL_INCLUDE_DIRS} : ${CURL_LIBRARIES}") From b5c2a855707d087c79a5b818b3629343b98aaa7d Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 18 Jul 2019 19:31:11 +0800 Subject: [PATCH 06/23] Remove curl from builder image --- docker/builder/Dockerfile | 3 --- 1 file changed, 3 deletions(-) diff --git a/docker/builder/Dockerfile b/docker/builder/Dockerfile index c615e43e934..614d4a777ab 100644 --- a/docker/builder/Dockerfile +++ b/docker/builder/Dockerfile @@ -13,9 +13,6 @@ RUN apt update -y \ # For tests: # bash expect python python-lxml python-termcolor curl perl sudo tzdata && rm -rf /var/lib/apt/lists/* -RUN git clone https://github.com/curl/curl.git \ - && cd /curl && mkdir .build && cd .build && cmake .. && make && make install - RUN git clone https://github.com/grpc/grpc.git && cd grpc && git checkout v1.14.2 && git submodule update --init \ && cd /grpc && mkdir .build && cd .build && cmake .. -DgRPC_BUILD_TESTS=OFF -DCMAKE_BUILD_TYPE=Release && make install -j $(nproc || grep -c ^processor /proc/cpuinfo) \ && rm -rf /grpc/.build \ From e515c3005daf4656bfd7149660e57850ff1570a7 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 18 Jul 2019 19:55:25 +0800 Subject: [PATCH 07/23] Remove useless codes, init schema syncer based on pd config --- dbms/src/Debug/MockSchemaSyncer.cpp | 1 - dbms/src/Interpreters/Context.cpp | 21 ++----------- dbms/src/Interpreters/Context.h | 5 ++- dbms/src/Server/Server.cpp | 32 ++++++++------------ dbms/src/Storages/Transaction/TMTContext.cpp | 9 ++++-- dbms/src/Storages/Transaction/TMTContext.h | 6 +++- dbms/src/TiDB/TiDBService.cpp | 17 ----------- dbms/src/TiDB/TiDBService.h | 29 ------------------ tests/docker/config/config.xml | 5 --- 9 files changed, 29 insertions(+), 96 deletions(-) delete mode 100644 dbms/src/TiDB/TiDBService.cpp delete mode 100644 dbms/src/TiDB/TiDBService.h diff --git a/dbms/src/Debug/MockSchemaSyncer.cpp b/dbms/src/Debug/MockSchemaSyncer.cpp index 9c5f6e78872..9addcfcc148 100644 --- a/dbms/src/Debug/MockSchemaSyncer.cpp +++ b/dbms/src/Debug/MockSchemaSyncer.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 141d611e7bf..d5e37309145 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -51,7 +51,6 @@ #include #include #include -#include #include #include @@ -152,7 +151,6 @@ struct ContextShared SharedQueriesPtr shared_queries; /// The cache of shared queries. RaftServicePtr raft_service; /// Raft service instance. - TiDBServicePtr tidb_service; /// TiDB service instance. SchemaSyncServicePtr schema_sync_service; /// Schema sync service instance. /// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests. @@ -1409,13 +1407,14 @@ void Context::shutdownRaftService() void Context::createTMTContext(const std::vector & pd_addrs, const std::string & learner_key, const std::string & learner_value, + const std::unordered_set & ignore_databases, const std::string & kvstore_path, const std::string & region_mapping_path) { auto lock = getLock(); if (shared->tmt_context) throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR); - shared->tmt_context = std::make_shared(*this, pd_addrs, learner_key, learner_value, kvstore_path, region_mapping_path); + shared->tmt_context = std::make_shared(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, region_mapping_path); } RaftService & Context::getRaftService() @@ -1426,22 +1425,6 @@ RaftService & Context::getRaftService() return *shared->raft_service; } -void Context::initializeTiDBService(const std::string & service_ip, const std::string & status_port, const std::unordered_set & ignore_databases) -{ - auto lock = getLock(); - if (shared->tidb_service) - throw Exception("TiDB Service has already been initialized.", ErrorCodes::LOGICAL_ERROR); - shared->tidb_service = std::make_shared(service_ip, status_port, ignore_databases); -} - -TiDBService & Context::getTiDBService() -{ - auto lock = getLock(); - if (!shared->tidb_service) - throw Exception("TiDB Service is not initialized.", ErrorCodes::LOGICAL_ERROR); - return *shared->tidb_service; -} - void Context::initializeSchemaSyncService() { auto lock = getLock(); diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 4b7402c1a6e..b5ec91baebe 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -361,13 +362,11 @@ class Context void createTMTContext(const std::vector & pd_addrs, const std::string & learner_key, const std::string & learner_value, + const std::unordered_set & ignore_databases, const std::string & kvstore_path, const std::string & region_mapping_path); RaftService & getRaftService(); - void initializeTiDBService(const std::string & service_ip, const std::string & status_port, const std::unordered_set & ignore_databases); - TiDBService & getTiDBService(); - void initializeSchemaSyncService(); SchemaSyncServicePtr getSchemaSyncService(); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index b8ca7b8cc13..db7437ebe0e 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -314,6 +314,7 @@ int Server::main(const std::vector & /*args*/) std::vector pd_addrs; std::string learner_key; std::string learner_value; + std::unordered_set ignore_databases; std::string kvstore_path = path + "kvstore/"; std::string region_mapping_path = path + "regmap/"; @@ -354,23 +355,7 @@ int Server::main(const std::vector & /*args*/) learner_value = "engine"; } - if (config().has("raft.kvstore_path")) - { - kvstore_path = config().getString("raft.kvstore_path"); - } - - if (config().has("raft.regmap")) - { - region_mapping_path = config().getString("raft.regmap"); - } - } - // TODO: Remove this config once decent schema syncer is done. - if (config().has("tidb")) - { - String service_ip = config().getString("tidb.service_ip"); - String status_port = config().getString("tidb.status_port"); - std::unordered_set ignore_databases; - if (config().has("tidb.ignore_databases")) + if (config().has("raft.ignore_databases")) { String ignore_dbs = config().getString("tidb.ignore_databases"); Poco::StringTokenizer string_tokens(ignore_dbs, ","); @@ -382,12 +367,21 @@ int Server::main(const std::vector & /*args*/) } LOG_INFO(log, "Found ignore databases:\n" << ss.str()); } - global_context->initializeTiDBService(service_ip, status_port, ignore_databases); + + if (config().has("raft.kvstore_path")) + { + kvstore_path = config().getString("raft.kvstore_path"); + } + + if (config().has("raft.regmap")) + { + region_mapping_path = config().getString("raft.regmap"); + } } { /// create TMTContext - global_context->createTMTContext(pd_addrs, learner_key, learner_value, kvstore_path, region_mapping_path); + global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, region_mapping_path); } /// Then, load remaining databases diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index e68e3c2e457..ed443f6ce04 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -9,14 +10,18 @@ namespace DB { TMTContext::TMTContext(Context & context, const std::vector & addrs, const std::string & learner_key, - const std::string & learner_value, const std::string & kvstore_path, const std::string & region_mapping_path) + const std::string & learner_value, const std::unordered_set & ignore_databases_, const std::string & kvstore_path, + const std::string & region_mapping_path) : kvstore(std::make_shared(kvstore_path)), region_table(context, region_mapping_path), pd_client(addrs.size() == 0 ? static_cast(new pingcap::pd::MockPDClient()) : static_cast(new pingcap::pd::Client(addrs))), region_cache(std::make_shared(pd_client, learner_key, learner_value)), rpc_client(std::make_shared()), - schema_syncer(std::make_shared(pd_client, region_cache, rpc_client)) + ignore_databases(ignore_databases_), + schema_syncer(addrs.size() == 0 + ? std::static_pointer_cast(std::make_shared()) + : std::static_pointer_cast(std::make_shared(pd_client, region_cache, rpc_client))) {} void TMTContext::restore() diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index 8b37eda5a0f..8595085d30c 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -8,6 +8,8 @@ #include #pragma GCC diagnostic pop +#include + namespace DB { @@ -35,7 +37,8 @@ class TMTContext : private boost::noncopyable // TODO: get flusher args from config file explicit TMTContext(Context & context, const std::vector & addrs, const std::string & learner_key, - const std::string & learner_value, const std::string & kv_store_path, const std::string & region_mapping_path); + const std::string & learner_value, const std::unordered_set & ignore_databases_, const std::string & kv_store_path, + const std::string & region_mapping_path); SchemaSyncerPtr getSchemaSyncer() const; void setSchemaSyncer(SchemaSyncerPtr); @@ -64,6 +67,7 @@ class TMTContext : private boost::noncopyable mutable std::mutex mutex; std::atomic_bool initialized = false; + const std::unordered_set ignore_databases; SchemaSyncerPtr schema_syncer; }; diff --git a/dbms/src/TiDB/TiDBService.cpp b/dbms/src/TiDB/TiDBService.cpp deleted file mode 100644 index 5e49d4a04c0..00000000000 --- a/dbms/src/TiDB/TiDBService.cpp +++ /dev/null @@ -1,17 +0,0 @@ -#include - -namespace DB -{ - -TiDBService::TiDBService( - const std::string & service_ip_, const std::string & status_port_, const std::unordered_set & ignore_databases_) - : service_ip(service_ip_), status_port(status_port_), ignore_databases(ignore_databases_) -{} - -const std::string & TiDBService::serviceIp() const { return service_ip; } - -const std::string & TiDBService::statusPort() const { return status_port; } - -const std::unordered_set & TiDBService::ignoreDatabases() const { return ignore_databases; } - -} // namespace DB diff --git a/dbms/src/TiDB/TiDBService.h b/dbms/src/TiDB/TiDBService.h deleted file mode 100644 index 32967ad65a9..00000000000 --- a/dbms/src/TiDB/TiDBService.h +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include -#include -#include - -#include - -namespace DB -{ - -class TiDBService final : public std::enable_shared_from_this, private boost::noncopyable -{ -public: - TiDBService( - const std::string & service_ip_, const std::string & status_port_, const std::unordered_set & ignore_databases_); - const std::string & serviceIp() const; - const std::string & statusPort() const; - const std::unordered_set & ignoreDatabases() const; - -private: - const std::string service_ip; - - const std::string status_port; - - const std::unordered_set ignore_databases; -}; - -} // namespace DB diff --git a/tests/docker/config/config.xml b/tests/docker/config/config.xml index a4ea5f68d53..e6911f1227e 100644 --- a/tests/docker/config/config.xml +++ b/tests/docker/config/config.xml @@ -21,11 +21,6 @@ - - 127.0.0.1 - 10080 - - 8123 9000 9009 From e084771576653878e0d09d5b6cf3bcf35baa9124 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Thu, 18 Jul 2019 22:41:31 +0800 Subject: [PATCH 08/23] Minor fix to schema debug --- dbms/src/Debug/MockSchemaSyncer.cpp | 9 ++------- dbms/src/Debug/dbgFuncSchema.cpp | 10 ++++++++-- dbms/src/Interpreters/Context.cpp | 4 +--- dbms/src/Interpreters/Context.h | 2 +- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/dbms/src/Debug/MockSchemaSyncer.cpp b/dbms/src/Debug/MockSchemaSyncer.cpp index 9addcfcc148..ef0f2966da4 100644 --- a/dbms/src/Debug/MockSchemaSyncer.cpp +++ b/dbms/src/Debug/MockSchemaSyncer.cpp @@ -261,13 +261,8 @@ void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table) auto & tmt_context = context.getTMTContext(); /// Get table schema json. - auto table_id = table->id(); - - String table_info_json = table->table_info.serialize(false); - - LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << " info json: " << table_info_json); - - TableInfo table_info(table_info_json, false); + TableInfo table_info = table->table_info; + auto table_id = table_info.id; auto storage = tmt_context.getStorages().get(table_id); diff --git a/dbms/src/Debug/dbgFuncSchema.cpp b/dbms/src/Debug/dbgFuncSchema.cpp index dc45d067e76..5431f9baaad 100644 --- a/dbms/src/Debug/dbgFuncSchema.cpp +++ b/dbms/src/Debug/dbgFuncSchema.cpp @@ -24,9 +24,15 @@ void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInv bool enable = safeGet(typeid_cast(*args[0]).value) == "true"; if (enable) - context.initializeSchemaSyncService(); + { + if (!context.getSchemaSyncService()) + context.initializeSchemaSyncService(); + } else - context.getSchemaSyncService().reset(); + { + if (context.getSchemaSyncService()) + context.getSchemaSyncService().reset(); + } std::stringstream ss; ss << "schema sync service " << (enable ? "enabled" : "disabled"); diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index d5e37309145..41f7c3078d9 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1433,11 +1433,9 @@ void Context::initializeSchemaSyncService() shared->schema_sync_service = std::make_shared(*this); } -SchemaSyncServicePtr Context::getSchemaSyncService() +SchemaSyncServicePtr & Context::getSchemaSyncService() { auto lock = getLock(); - if (!shared->schema_sync_service) - throw Exception("Schema Sync Service is not initialized.", ErrorCodes::LOGICAL_ERROR); return shared->schema_sync_service; } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index b5ec91baebe..1c1f9b8c041 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -368,7 +368,7 @@ class Context RaftService & getRaftService(); void initializeSchemaSyncService(); - SchemaSyncServicePtr getSchemaSyncService(); + SchemaSyncServicePtr & getSchemaSyncService(); Clusters & getClusters() const; std::shared_ptr getCluster(const std::string & cluster_name) const; From e89c697e419aad7a737f5fa9aeee929368137d26 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 19 Jul 2019 01:44:33 +0800 Subject: [PATCH 09/23] Fix alter tmt and pass tests --- dbms/src/Storages/StorageMergeTree.cpp | 55 +++++++++--------------- dbms/src/Storages/StorageMergeTree.h | 2 + tests/mutable-test/txn_schema/alter.test | 8 ++-- tests/mutable-test/txn_schema/drop.test | 6 +-- 4 files changed, 31 insertions(+), 40 deletions(-) diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index c8f90d96c74..f57e31e3f43 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -303,6 +303,25 @@ void StorageMergeTree::alter( const String & database_name, const String & table_name, const Context & context) +{ + alterInternal(params, database_name, table_name, std::nullopt, context); +} + +void StorageMergeTree::alterForTMT( + const AlterCommands & params, + const TiDB::TableInfo & table_info, + const String & database_name, + const Context & context) +{ + alterInternal(params, database_name, table_info.name, std::optional>(table_info), context); +} + +void StorageMergeTree::alterInternal( + const AlterCommands & params, + const String & database_name, + const String & table_name, + const std::optional> table_info, + const Context & context) { /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time. auto merge_blocker = merger.merges_blocker.cancel(); @@ -364,6 +383,8 @@ void StorageMergeTree::alter( context.getDatabase(database_name)->alterTable(context, table_name, new_columns, storage_modifier); setColumns(std::move(new_columns)); + if (table_info) + setTableInfo(table_info->get()); if (primary_key_is_modified) { @@ -382,40 +403,6 @@ void StorageMergeTree::alter( data.loadDataParts(false); } -void StorageMergeTree::alterForTMT( - const AlterCommands & params, - const TiDB::TableInfo & table_info, - const String & database_name, - const Context & context) -{ - const String & table_name = table_info.name; - /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time. - auto merge_blocker = merger.merges_blocker.cancel(); - - auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__); - - data.checkAlter(params); - - auto new_columns = data.getColumns(); - params.apply(new_columns); - -// std::vector transactions; - - auto table_hard_lock = lockStructureForAlter(__PRETTY_FUNCTION__); - - IDatabase::ASTModifier storage_modifier = [this] (IAST & ast) - { - auto & storage_ast = typeid_cast(ast); - - auto literal = std::make_shared(Field(data.table_info->serialize(true))); - typeid_cast(*storage_ast.engine->arguments).children.back() = literal; - }; - - context.getDatabase(database_name)->alterTable(context, table_name, new_columns, storage_modifier); - setTableInfo(table_info); - setColumns(std::move(new_columns)); -} - /// While exists, marks parts as 'currently_merging' and reserves free space on filesystem. /// It's possible to mark parts before. struct CurrentlyMergingPartsTagger diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 8582e8232d5..6cdfe9ea03d 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -81,6 +81,8 @@ class StorageMergeTree : public ext::shared_ptr_helper, public void alterForTMT(const AlterCommands & params, const TiDB::TableInfo & table_info, const String & database_name, const Context & context); + void alterInternal(const AlterCommands & params, const String & database_name, const String & table_name, const std::optional> table_info, const Context & context); + bool checkTableCanBeDropped() const override; const TableInfo & getTableInfo() const; diff --git a/tests/mutable-test/txn_schema/alter.test b/tests/mutable-test/txn_schema/alter.test index c5500e9f68d..9fa67026043 100644 --- a/tests/mutable-test/txn_schema/alter.test +++ b/tests/mutable-test/txn_schema/alter.test @@ -5,9 +5,11 @@ => DBGInvoke __set_flush_threshold(1000000, 1000000) => DBGInvoke __mock_schema_syncer('true') +=> DBGInvoke __enable_schema_sync_service('false') + # Sync add column by checking missing column in CH when flushing. => DBGInvoke __mock_tidb_table(default, test, 'col_1 String') -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => DBGInvoke __put_region(4, 0, 100, default, test) => select col_1 from default.test => DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)') @@ -71,7 +73,7 @@ Code: 47. DB::Exception: Received from {#WORD} DB::Exception: Unknown identifier # Sync add column and type change together by checking value overflow in CH when flushing. => DBGInvoke __add_column_to_tidb_table(default, test, 'col_3 UInt8') -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => DBGInvoke __modify_column_in_tidb_table(default, test, 'col_3 UInt64') => DBGInvoke __raft_insert_row(default, test, 4, 55, 0, 256) => DBGInvoke __add_column_to_tidb_table(default, test, 'col_4 Nullable(UInt8)') @@ -109,7 +111,7 @@ Code: 47. DB::Exception: Received from {#WORD} DB::Exception: Unknown identifier │ -9223372036854775807 │ 18446744073709551615 │ 1 │ │ 9223372036854775807 │ 18446744073709551615 │ 1 │ └──────────────────────┴──────────────────────┴───────┘ -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => selraw nokvstore col_3 from default.test Received exception from server (version {#WORD}): Code: 47. DB::Exception: Received from {#WORD} DB::Exception: Unknown identifier: col_3. diff --git a/tests/mutable-test/txn_schema/drop.test b/tests/mutable-test/txn_schema/drop.test index f1126511591..bb71443b30f 100644 --- a/tests/mutable-test/txn_schema/drop.test +++ b/tests/mutable-test/txn_schema/drop.test @@ -5,7 +5,7 @@ => DBGInvoke __mock_schema_syncer('true') => DBGInvoke __mock_tidb_table(default, test, 'col_1 String') -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => DBGInvoke __put_region(4, 0, 100, default, test) => select col_1 from default.test => DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)') @@ -17,7 +17,7 @@ Received exception from server (version {#WORD}): Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)') -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => select col_1, col_2 from default.test => DBGInvoke __drop_column_from_tidb_table(default, test, col_2) => DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2') @@ -28,7 +28,7 @@ Received exception from server (version {#WORD}): Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)') -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => select col_1, col_2 from default.test => DBGInvoke __modify_column_in_tidb_table(default, test, 'col_2 Nullable(Int64)') => DBGInvoke __raft_insert_row(default, test, 4, 52, 'test2', 256) From 074f521c29a99af41c25413892fc8196f60212de Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 19 Jul 2019 01:55:15 +0800 Subject: [PATCH 10/23] Fix build fail --- dbms/src/Interpreters/Context.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index feee3162f22..afe6faf59c6 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include From 3140836c4f523d8e783a13587409fa4f439a17a6 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 19 Jul 2019 15:49:46 +0800 Subject: [PATCH 11/23] Add lock for mock schema syncer --- dbms/src/Debug/MockSchemaSyncer.cpp | 2 ++ dbms/src/Debug/MockSchemaSyncer.h | 2 ++ 2 files changed, 4 insertions(+) diff --git a/dbms/src/Debug/MockSchemaSyncer.cpp b/dbms/src/Debug/MockSchemaSyncer.cpp index ef0f2966da4..2ce71aae520 100644 --- a/dbms/src/Debug/MockSchemaSyncer.cpp +++ b/dbms/src/Debug/MockSchemaSyncer.cpp @@ -236,6 +236,8 @@ MockSchemaSyncer::MockSchemaSyncer() : log(&Logger::get("MockSchemaSyncer")) {} bool MockSchemaSyncer::syncSchemas(Context & context) { + std::lock_guard lock(schema_mutex); + std::unordered_map new_tables; MockTiDB::instance().traverseTables([&](const auto & table) { new_tables.emplace(table->id(), table); }); diff --git a/dbms/src/Debug/MockSchemaSyncer.h b/dbms/src/Debug/MockSchemaSyncer.h index 4139f02c425..2409a1ad923 100644 --- a/dbms/src/Debug/MockSchemaSyncer.h +++ b/dbms/src/Debug/MockSchemaSyncer.h @@ -19,6 +19,8 @@ class MockSchemaSyncer : public SchemaSyncer Logger * log; + std::mutex schema_mutex; + std::unordered_map tables; }; From 23ff96c1bbda4e65aa794c8acf4f04167c929468 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 19 Jul 2019 16:15:26 +0800 Subject: [PATCH 12/23] Fix schema sync service init context --- dbms/src/Interpreters/Context.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index afe6faf59c6..2247dd3f04f 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1460,7 +1460,7 @@ void Context::initializeSchemaSyncService() auto lock = getLock(); if (shared->schema_sync_service) throw Exception("Schema Sync Service has already been initialized.", ErrorCodes::LOGICAL_ERROR); - shared->schema_sync_service = std::make_shared(*this); + shared->schema_sync_service = std::make_shared(*global_context); } SchemaSyncServicePtr & Context::getSchemaSyncService() From fb638a785c9c10bc5dc9f72003308d5adedd993d Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 19 Jul 2019 16:37:31 +0800 Subject: [PATCH 13/23] Adjust schema tests --- tests/mutable-test/txn_schema/alter.test | 5 +++-- tests/mutable-test/txn_schema/drop.test | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/mutable-test/txn_schema/alter.test b/tests/mutable-test/txn_schema/alter.test index 9fa67026043..7a3e0f3d2dd 100644 --- a/tests/mutable-test/txn_schema/alter.test +++ b/tests/mutable-test/txn_schema/alter.test @@ -1,12 +1,12 @@ # Preparation. +=> DBGInvoke __enable_schema_sync_service('false') + => DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test => DBGInvoke __set_flush_threshold(1000000, 1000000) => DBGInvoke __mock_schema_syncer('true') -=> DBGInvoke __enable_schema_sync_service('false') - # Sync add column by checking missing column in CH when flushing. => DBGInvoke __mock_tidb_table(default, test, 'col_1 String') => DBGInvoke __refresh_schemas() @@ -134,3 +134,4 @@ Code: 47. DB::Exception: Received from {#WORD} DB::Exception: Unknown identifier # Clean up. => DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test +=> DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/mutable-test/txn_schema/drop.test b/tests/mutable-test/txn_schema/drop.test index bb71443b30f..dee861d85df 100644 --- a/tests/mutable-test/txn_schema/drop.test +++ b/tests/mutable-test/txn_schema/drop.test @@ -1,3 +1,5 @@ +=> DBGInvoke __enable_schema_sync_service('false') + => DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test @@ -40,3 +42,4 @@ Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test => DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test +=> DBGInvoke __enable_schema_sync_service('true') From fc10c2e39f20b315389ecb680bda699e14e9b636 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 19 Jul 2019 17:24:10 +0800 Subject: [PATCH 14/23] Not sync if no schema change detected --- dbms/src/Debug/MockSchemaSyncer.cpp | 30 +++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/dbms/src/Debug/MockSchemaSyncer.cpp b/dbms/src/Debug/MockSchemaSyncer.cpp index 2ce71aae520..ba31b1a2325 100644 --- a/dbms/src/Debug/MockSchemaSyncer.cpp +++ b/dbms/src/Debug/MockSchemaSyncer.cpp @@ -263,7 +263,7 @@ void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table) auto & tmt_context = context.getTMTContext(); /// Get table schema json. - TableInfo table_info = table->table_info; + const TableInfo & table_info = table->table_info; auto table_id = table_info.id; auto storage = tmt_context.getStorages().get(table_id); @@ -280,17 +280,16 @@ void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table) auto create_table_internal = [&]() { LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Creating table " << table_info.name); createTable(table_info, context); - auto logical_storage = std::static_pointer_cast(context.getTable(table_info.db_name, table_info.name)); - context.getTMTContext().getStorages().put(logical_storage); - /// Mangle for partition table. - bool is_partition_table = table_info.manglePartitionTableIfNeeded(table_id); - if (is_partition_table && !context.isTableExist(table_info.db_name, table_info.name)) + /// Create sub-table for partitions if any. + if (table_info.is_partition_table) { - LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Re-creating table after mangling partition table " << table_info.name); - createTable(table_info, context); - auto physical_storage = std::static_pointer_cast(context.getTable(table_info.db_name, table_info.name)); - context.getTMTContext().getStorages().put(physical_storage); + // create partition table. + for (auto part_def : table_info.partition.definitions) + { + auto part_table_info = table_info.producePartitionTableInfo(part_def.id); + createTable(*part_table_info, context); + } } }; @@ -345,12 +344,15 @@ void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table) LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": " << ss.str()); - // Call storage alter to apply schema changes. - storage->alterForTMT(alter_commands, table_info, table->table_info.db_name, context); + if (!alter_commands.empty()) + { + // Call storage alter to apply schema changes. + storage->alterForTMT(alter_commands, table_info, table->table_info.db_name, context); - LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Schema changes apply done."); + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Schema changes apply done."); - // TODO: Apply schema changes to partition tables. + // TODO: Apply schema changes to partition tables. + } } } // namespace DB From d3b0af9c681401cd40c80ec9e18638d73e96fa09 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Fri, 19 Jul 2019 17:26:25 +0800 Subject: [PATCH 15/23] Adjust txn mock tests --- .../mutable-test/txn_mock/data_only_in_region.test | 5 ++--- tests/mutable-test/txn_mock/delete.test | 4 ++-- tests/mutable-test/txn_mock/insert.test | 4 ++-- tests/mutable-test/txn_mock/order_by.test | 4 ++-- tests/mutable-test/txn_mock/partition_table.test | 13 +++++++------ tests/mutable-test/txn_mock/region.test | 4 ++-- .../txn_mock/same_version_diff_delmark.test | 4 ++-- tests/mutable-test/txn_mock/select.test | 4 ++-- tests/mutable-test/txn_mock/selraw.test | 4 ++-- tests/mutable-test/txn_mock/snapshot.test | 4 ++-- tests/mutable-test/txn_mock/snapshot_cache.test | 4 ++-- 11 files changed, 27 insertions(+), 27 deletions(-) diff --git a/tests/mutable-test/txn_mock/data_only_in_region.test b/tests/mutable-test/txn_mock/data_only_in_region.test index be9ee484f41..eb83d6e29ed 100644 --- a/tests/mutable-test/txn_mock/data_only_in_region.test +++ b/tests/mutable-test/txn_mock/data_only_in_region.test @@ -1,5 +1,5 @@ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test => DBGInvoke __set_flush_threshold(100000, 20000000) => DBGInvoke mock_schema_syncer('true') @@ -9,7 +9,6 @@ => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') => DBGInvoke __put_region(4, 0, 100, default, test) -=> DBGInvoke __refresh_schema(default, test) => DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', -1) => DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', -2) @@ -36,5 +35,5 @@ │ 0 │ └─────────┘ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_mock/delete.test b/tests/mutable-test/txn_mock/delete.test index 846be028ea0..dea33f90e8e 100644 --- a/tests/mutable-test/txn_mock/delete.test +++ b/tests/mutable-test/txn_mock/delete.test @@ -1,5 +1,5 @@ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test => DBGInvoke __set_flush_threshold(100000, 0) => DBGInvoke mock_schema_syncer('true') @@ -62,5 +62,5 @@ │ test3 │ 3 │ 52 │ 0 │ └───────┴───────┴─────────────┴───────────────────┘ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_mock/insert.test b/tests/mutable-test/txn_mock/insert.test index 70c28de269c..96731ac1283 100644 --- a/tests/mutable-test/txn_mock/insert.test +++ b/tests/mutable-test/txn_mock/insert.test @@ -1,5 +1,5 @@ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test => DBGInvoke __set_flush_threshold(100000, 0) => DBGInvoke mock_schema_syncer('true') @@ -33,5 +33,5 @@ │ 4 │ └─────────┘ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_mock/order_by.test b/tests/mutable-test/txn_mock/order_by.test index ae2db0badf6..42084ab5c38 100644 --- a/tests/mutable-test/txn_mock/order_by.test +++ b/tests/mutable-test/txn_mock/order_by.test @@ -1,5 +1,5 @@ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test => DBGInvoke __set_flush_threshold(100000, 0) => DBGInvoke mock_schema_syncer('true') @@ -33,5 +33,5 @@ │ test1 │ -1 │ 50 │ └───────┴───────┴─────────────┘ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_mock/partition_table.test b/tests/mutable-test/txn_mock/partition_table.test index 003f8c36f9b..c2d67484f10 100644 --- a/tests/mutable-test/txn_mock/partition_table.test +++ b/tests/mutable-test/txn_mock/partition_table.test @@ -1,22 +1,22 @@ +=> DBGInvoke __enable_schema_sync_service('false') + +=> DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test => drop table if exists default.test_p1 => drop table if exists default.test_p2 -=> DBGInvoke __drop_tidb_table(default, test) => DBGInvoke __mock_schema_syncer('true') => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') - => DBGInvoke __mock_tidb_partition(default, test, p1) -=> DBGInvoke __put_region(4, 0, 100, default, test, p1) +=> DBGInvoke __mock_tidb_partition(default, test, p2) +=> DBGInvoke __put_region(4, 0, 100, default, test, p1) => DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 1) => DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 2) => DBGInvoke __try_flush_region(4) -=> DBGInvoke __mock_tidb_partition(default, test, p2) => DBGInvoke __put_region(5, 100, 200, default, test, p2) - => DBGInvoke __raft_insert_row(default, test, 5, 152, 'test3', 3) => DBGInvoke __raft_insert_row(default, test, 5, 153, 'test4', 4) => DBGInvoke __try_flush_region(5) @@ -44,7 +44,8 @@ │ 2 │ └─────────┘ +=> DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test => drop table if exists default.test_p1 => drop table if exists default.test_p2 -=> DBGInvoke __drop_tidb_table(default, test) +=> DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/mutable-test/txn_mock/region.test b/tests/mutable-test/txn_mock/region.test index a5a455008e4..b66270ebb10 100644 --- a/tests/mutable-test/txn_mock/region.test +++ b/tests/mutable-test/txn_mock/region.test @@ -1,5 +1,5 @@ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test => DBGInvoke __set_flush_threshold(1, 2330) => DBGInvoke mock_schema_syncer('true') @@ -32,5 +32,5 @@ │ 80004 │ └─────────┘ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_mock/same_version_diff_delmark.test b/tests/mutable-test/txn_mock/same_version_diff_delmark.test index 1540953ec18..7969c086d0f 100644 --- a/tests/mutable-test/txn_mock/same_version_diff_delmark.test +++ b/tests/mutable-test/txn_mock/same_version_diff_delmark.test @@ -1,5 +1,5 @@ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test => DBGInvoke mock_schema_syncer('true') ┌─mock_schema_syncer("true")─┐ │ mock schema syncer enabled │ @@ -36,5 +36,5 @@ │ 12 │ 3 │ 4 │ 0 │ │ 0 │ 3 │ 4 │ 1 │ └───────┴─────────────┴───────────────────┴───────────────────┘ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_mock/select.test b/tests/mutable-test/txn_mock/select.test index 3f9dec491e5..38eb2db0d59 100644 --- a/tests/mutable-test/txn_mock/select.test +++ b/tests/mutable-test/txn_mock/select.test @@ -1,5 +1,5 @@ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test => DBGInvoke __set_flush_threshold(100000, 0) => DBGInvoke mock_schema_syncer('true') @@ -41,5 +41,5 @@ │ 2 │ test2 │ └────────────┴───────┘ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_mock/selraw.test b/tests/mutable-test/txn_mock/selraw.test index 7d422417c1f..3a023a4948c 100644 --- a/tests/mutable-test/txn_mock/selraw.test +++ b/tests/mutable-test/txn_mock/selraw.test @@ -1,5 +1,5 @@ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test => DBGInvoke __set_flush_threshold(100000, 0) => DBGInvoke mock_schema_syncer('true') @@ -42,5 +42,5 @@ │ 2 │ test2 │ └────────────┴───────┘ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_mock/snapshot.test b/tests/mutable-test/txn_mock/snapshot.test index ee47dc2408e..025d4e1f96f 100644 --- a/tests/mutable-test/txn_mock/snapshot.test +++ b/tests/mutable-test/txn_mock/snapshot.test @@ -1,5 +1,5 @@ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test => DBGInvoke mock_schema_syncer('true') ┌─mock_schema_syncer("true")─┐ │ mock schema syncer enabled │ @@ -55,5 +55,5 @@ │ 13 │ 1 │ 3 │ 0 │ │ 11 │ 2 │ 3 │ 0 │ └───────┴─────────────┴───────────────────┴───────────────────┘ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_mock/snapshot_cache.test b/tests/mutable-test/txn_mock/snapshot_cache.test index 510e6525f4f..3f6be68c0ff 100644 --- a/tests/mutable-test/txn_mock/snapshot_cache.test +++ b/tests/mutable-test/txn_mock/snapshot_cache.test @@ -1,5 +1,5 @@ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test => DBGInvoke mock_schema_syncer('true') ┌─mock_schema_syncer("true")─┐ │ mock schema syncer enabled │ @@ -54,5 +54,5 @@ │ 19 │ 2 │ │ 20 │ 4 │ └───────┴─────────────┘ -=> drop table if exists default.test => DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test From 517793b6c459c8252dc399102405d4ca8ddcf43b Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Sat, 20 Jul 2019 00:59:04 +0800 Subject: [PATCH 16/23] Fix default value bug --- dbms/src/Debug/MockTiDB.cpp | 4 ++++ dbms/src/Storages/Transaction/RegionBlockReader.cpp | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 5ec28ba2598..6f2d4928840 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -97,6 +97,10 @@ ColumnInfo getColumnInfoFromColumn(const NameAndTypePair & column, ColumnID id) if (checkDataType(nested_type)) column_info.tp = TiDB::TypeLongLong; + // Default value. + // TODO: Parse default value and set flag properly. + column_info.setNoDefaultValueFlag(); + return column_info; } diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index 59d6dfdefcf..fd4685058d3 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -185,8 +185,12 @@ std::tuple readRegionBlock(const TiDB::TableInfo & table_info, return std::make_tuple(block, false); row.emplace_back(Field(column.id)); - // Fill `zero` value if NOT NULL specified or else NULL. - row.push_back(column.defaultValueToField()); + if (column.hasNoDefaultValueFlag()) + // Fill `zero` value if NOT NULL specified or else NULL. + row.push_back(column.hasNotNullFlag() ? GenDecodeRow(column.getCodecFlag()) : Field()); + else + // Fill default value. + row.push_back(column.defaultValueToField()); } // Remove values of non-existing columns, which could be data inserted (but not flushed) before DDLs that drop some columns. From 0e510f371f10b4ea6647022f36bdd6750639001a Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Sat, 20 Jul 2019 01:00:45 +0800 Subject: [PATCH 17/23] Rename some tests --- tests/mutable-test/txn_schema/{alter.test => alter_on_write.test} | 0 tests/mutable-test/txn_schema/{drop.test => drop_on_write.test} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tests/mutable-test/txn_schema/{alter.test => alter_on_write.test} (100%) rename tests/mutable-test/txn_schema/{drop.test => drop_on_write.test} (100%) diff --git a/tests/mutable-test/txn_schema/alter.test b/tests/mutable-test/txn_schema/alter_on_write.test similarity index 100% rename from tests/mutable-test/txn_schema/alter.test rename to tests/mutable-test/txn_schema/alter_on_write.test diff --git a/tests/mutable-test/txn_schema/drop.test b/tests/mutable-test/txn_schema/drop_on_write.test similarity index 100% rename from tests/mutable-test/txn_schema/drop.test rename to tests/mutable-test/txn_schema/drop_on_write.test From 32e9f3d5bb42bc4d2be0036a14d7b620b78849e1 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Sat, 20 Jul 2019 02:26:26 +0800 Subject: [PATCH 18/23] Remove sync schema test --- .../Storages/Transaction/tests/CMakeLists.txt | 3 -- .../Transaction/tests/sync_schema.cpp | 42 ------------------- 2 files changed, 45 deletions(-) delete mode 100644 dbms/src/Storages/Transaction/tests/sync_schema.cpp diff --git a/dbms/src/Storages/Transaction/tests/CMakeLists.txt b/dbms/src/Storages/Transaction/tests/CMakeLists.txt index 9b80fc19f3c..fbf4fb0d4e8 100644 --- a/dbms/src/Storages/Transaction/tests/CMakeLists.txt +++ b/dbms/src/Storages/Transaction/tests/CMakeLists.txt @@ -1,8 +1,5 @@ include_directories (${CMAKE_CURRENT_BINARY_DIR}) -add_executable (sync_schema sync_schema.cpp) -target_link_libraries (sync_schema dbms) - add_executable (tikv_keyvalue tikv_keyvalue.cpp) target_link_libraries (tikv_keyvalue dbms) diff --git a/dbms/src/Storages/Transaction/tests/sync_schema.cpp b/dbms/src/Storages/Transaction/tests/sync_schema.cpp deleted file mode 100644 index 1fbfd365274..00000000000 --- a/dbms/src/Storages/Transaction/tests/sync_schema.cpp +++ /dev/null @@ -1,42 +0,0 @@ -#include -#include -#include - - -namespace DB -{ -using namespace TiDB; - -String getTiDBTableInfoJsonByCurl(TableID table_id, Context & context); -String createDatabaseStmt(const TableInfo & table_info); -String createTableStmt(const TableInfo & table_info); - -} - -int main(int, char ** args) -{ - using namespace DB; - using namespace TiDB; - - auto context = Context::createGlobal(); - - std::istringstream is(args[1]); - TableID table_id; - is >> table_id; - - String json_str = getTiDBTableInfoJsonByCurl(table_id, context); - - std::cout << json_str << std::endl; - - TableInfo table_info(json_str, false); - - String create_db_stmt = createDatabaseStmt(table_info); - - std::cout << create_db_stmt << std::endl; - - String create_table_stmt = createTableStmt(table_info); - - std::cout << create_table_stmt << std::endl; - - return 0; -} From c6d4f865ae0f7ac6a7627e58725bbb2e82f9df9a Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Sat, 20 Jul 2019 03:41:55 +0800 Subject: [PATCH 19/23] Remove a lot useless code --- dbms/src/Debug/MockSchemaSyncer.cpp | 2 +- .../Storages/Transaction/SchemaBuilder.cpp | 35 +++--- dbms/src/Storages/Transaction/SchemaBuilder.h | 6 +- dbms/src/Storages/Transaction/TiDB.cpp | 119 ------------------ dbms/src/Storages/Transaction/TiDB.h | 39 +----- .../Storages/Transaction/tests/table_info.cpp | 12 +- 6 files changed, 32 insertions(+), 181 deletions(-) diff --git a/dbms/src/Debug/MockSchemaSyncer.cpp b/dbms/src/Debug/MockSchemaSyncer.cpp index ba31b1a2325..6fc68cea01e 100644 --- a/dbms/src/Debug/MockSchemaSyncer.cpp +++ b/dbms/src/Debug/MockSchemaSyncer.cpp @@ -288,7 +288,7 @@ void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table) for (auto part_def : table_info.partition.definitions) { auto part_table_info = table_info.producePartitionTableInfo(part_def.id); - createTable(*part_table_info, context); + createTable(part_table_info, context); } } }; diff --git a/dbms/src/Storages/Transaction/SchemaBuilder.cpp b/dbms/src/Storages/Transaction/SchemaBuilder.cpp index aaabf45fb6f..4524fe71411 100644 --- a/dbms/src/Storages/Transaction/SchemaBuilder.cpp +++ b/dbms/src/Storages/Transaction/SchemaBuilder.cpp @@ -4,10 +4,10 @@ #include #include #include +#include #include #include #include -#include #include #include #include @@ -37,7 +37,8 @@ inline AlterCommands detectSchemaChanges(Logger * log, const TiDB::TableInfo & t command.type = AlterCommand::ADD_COLUMN; command.column_name = column_info.name; command.data_type = getDataTypeByColumnInfo(column_info); - if (!column_info.origin_default_value.isEmpty()) { + if (!column_info.origin_default_value.isEmpty()) + { LOG_DEBUG(log, "add default value for column: " + column_info.name); command.default_expression = ASTPtr(new ASTLiteral(column_info.defaultValueToField())); } @@ -92,14 +93,13 @@ void SchemaBuilder::applyAlterTableImpl(TiDB::TableInfoPtr table_info, const Str for (auto part_def : table_info->partition.definitions) { auto new_table_info = table_info->producePartitionTableInfo(part_def.id); - storage->alterForTMT(commands, *new_table_info, db_name, context); + storage->alterForTMT(commands, new_table_info, db_name, context); } } } void SchemaBuilder::applyAlterTable(TiDB::DBInfoPtr dbInfo, Int64 table_id) { - auto table_info = getter.getTableInfo(dbInfo->id, table_id); auto & tmt_context = context.getTMTContext(); auto storage = static_cast(tmt_context.getStorages().get(table_id).get()); @@ -279,16 +279,16 @@ String createTableStmt(const DBInfo & db_info, const TableInfo & table_info) return stmt; } -void SchemaBuilder::applyCreatePhysicalTableImpl(TiDB::DBInfoPtr db_info, TiDB::TableInfoPtr table_info) +void SchemaBuilder::applyCreatePhysicalTableImpl(const TiDB::DBInfo & db_info, const TiDB::TableInfo & table_info) { - String stmt = createTableStmt(*db_info, *table_info); + String stmt = createTableStmt(db_info, table_info); ParserCreateQuery parser; - ASTPtr ast = parseQuery(parser, stmt.data(), stmt.data() + stmt.size(), "from syncSchema " + table_info->name, 0); + ASTPtr ast = parseQuery(parser, stmt.data(), stmt.data() + stmt.size(), "from syncSchema " + table_info.name, 0); ASTCreateQuery * ast_create_query = typeid_cast(ast.get()); ast_create_query->attach = true; - ast_create_query->database = db_info->name; + ast_create_query->database = db_info.name; InterpreterCreateQuery interpreter(ast, context); interpreter.setInternal(true); @@ -305,17 +305,17 @@ void SchemaBuilder::applyCreateTable(TiDB::DBInfoPtr db_info, Int64 table_id) // this table is dropped. return; } - applyCreateTableImpl(db_info, table_info); + applyCreateTableImpl(*db_info, *table_info); } -void SchemaBuilder::applyCreateTableImpl(TiDB::DBInfoPtr db_info, TiDB::TableInfoPtr table_info) +void SchemaBuilder::applyCreateTableImpl(const TiDB::DBInfo & db_info, const TiDB::TableInfo & table_info) { - if (table_info->is_partition_table) + if (table_info.is_partition_table) { // create partition table. - for (auto part_def : table_info->partition.definitions) + for (auto part_def : table_info.partition.definitions) { - auto new_table_info = table_info->producePartitionTableInfo(part_def.id); + auto new_table_info = table_info.producePartitionTableInfo(part_def.id); applyCreatePhysicalTableImpl(db_info, new_table_info); } } @@ -347,7 +347,7 @@ void SchemaBuilder::applyDropTable(TiDB::DBInfoPtr dbInfo, Int64 table_id) for (auto part_def : table_info.partition.definitions) { auto new_table_info = table_info.producePartitionTableInfo(part_def.id); - applyDropTableImpl(database_name, new_table_info->name); + applyDropTableImpl(database_name, new_table_info.name); } } // and drop logic table. @@ -370,9 +370,11 @@ void SchemaBuilder::updateDB(TiDB::DBInfoPtr db_info) table_ids.insert(table->id); auto storage_map = tmt_context.getStorages().getAllStorage(); - for (auto it = storage_map.begin(); it != storage_map.end(); it++) { + for (auto it = storage_map.begin(); it != storage_map.end(); it++) + { auto storage = it->second; - if(storage->getDatabaseName() == db_info->name && table_ids.count(storage->getTableInfo().id) == 0) { + if (storage->getDatabaseName() == db_info->name && table_ids.count(storage->getTableInfo().id) == 0) + { // Drop Table applyDropTableImpl(db_info->name, storage->getTableName()); LOG_DEBUG(log, "Table " + db_info->name + "." + storage->getTableName() + " is dropped during schema all schemas"); @@ -391,7 +393,6 @@ void SchemaBuilder::updateDB(TiDB::DBInfoPtr db_info) applyAlterTableImpl(table, db_info->name, storage); } } - } // end namespace diff --git a/dbms/src/Storages/Transaction/SchemaBuilder.h b/dbms/src/Storages/Transaction/SchemaBuilder.h index f78bd3939ef..bc2c1d6a267 100644 --- a/dbms/src/Storages/Transaction/SchemaBuilder.h +++ b/dbms/src/Storages/Transaction/SchemaBuilder.h @@ -33,7 +33,6 @@ struct SchemaBuilder void applyCreateSchemaImpl(TiDB::DBInfoPtr db_info); - void applyCreateTable(TiDB::DBInfoPtr dbInfo, Int64 table_id); void applyDropTable(TiDB::DBInfoPtr dbInfo, Int64 table_id); @@ -46,12 +45,11 @@ struct SchemaBuilder //void applyDropPartition(TiDB::DBInfoPtr dbInfo, Int64 table_id); - void applyCreatePhysicalTableImpl(TiDB::DBInfoPtr db_info, TiDB::TableInfoPtr table_info); + void applyCreatePhysicalTableImpl(const TiDB::DBInfo & db_info, const TiDB::TableInfo & table_info); - void applyCreateTableImpl(TiDB::DBInfoPtr db_info, TiDB::TableInfoPtr table_info); + void applyCreateTableImpl(const TiDB::DBInfo & db_info, const TiDB::TableInfo & table_info); void applyDropTableImpl(const String &, const String &); }; -bool applyCreateSchemaImpl(TiDB::DBInfoPtr db_info); } // namespace DB diff --git a/dbms/src/Storages/Transaction/TiDB.cpp b/dbms/src/Storages/Transaction/TiDB.cpp index 8b2c23dec55..fcd8ba3b9a6 100644 --- a/dbms/src/Storages/Transaction/TiDB.cpp +++ b/dbms/src/Storages/Transaction/TiDB.cpp @@ -2,128 +2,9 @@ #include #include -namespace JsonSer -{ - -using DB::WriteBuffer; - -template -void serValue(WriteBuffer & buf, const bool & b) -{ - writeString(b ? "true" : "false", buf); -} - -template -typename std::enable_if_t::value || std::is_enum::value> serValue(WriteBuffer & buf, T i) -{ - writeIntText(static_cast(i), buf); -} - -template -typename std::enable_if_t::value> serValue(WriteBuffer & buf, T f) -{ - writeFloadText(f, buf); -} - -// String that has been already encoded as JSON. -struct JsonString : public std::string -{ -}; - -template -void serValue(WriteBuffer & buf, const JsonString & qs) -{ - writeString(qs, buf); -} - -template -void serValue(WriteBuffer & buf, const std::string & s) -{ - writeJSONString(s, buf); -} - -template -void serValue(WriteBuffer & buf, const std::vector & v) -{ - writeString("[", buf); - bool first = true; - for (auto & e : v) - { - first = first ? false : (writeString(",", buf), false); - serValue(buf, e); - } - writeString("]", buf); -} - -template > -void serValue(WriteBuffer & buf, const std::function & s) -{ - s(buf); -} - -template -std::function Nullable(const T & value, bool is_null) -{ - return [value, is_null](WriteBuffer & buf) { is_null ? writeString("null", buf) : serValue(buf, value); }; -} - -template -struct Field -{ - Field(std::string name_, T value_, bool skip_ = false) : name(std::move(name_)), value(std::move(value_)), skip(skip_) {} - std::string name; - T value; - bool skip; -}; - -template -void serField(WriteBuffer & buf, const Field & field) -{ - writeJSONString(field.name, buf); - writeString(":", buf); - serValue(buf, field.value); -} - -template -void serFields(WriteBuffer & buf, const T & last) -{ - if (!last.skip) - serField(buf, last); -} - -template -void serFields(WriteBuffer & buf, const T & first, const Rest &... rest) -{ - if (!first.skip) - { - serField(buf, first); - writeString(",", buf); - } - serFields(buf, rest...); -} - -template -void serValue(WriteBuffer & buf, const T &... fields) -{ - writeString("{", buf); - serFields(buf, fields...); - writeString("}", buf); -} - -template -std::function Struct(const T &... fields) -{ - return [fields...](WriteBuffer & buf) { serValue(buf, fields...); }; -} - -} // namespace JsonSer - namespace TiDB { - using DB::Field; -using DB::ReadBufferFromString; -using DB::WriteBuffer; using DB::WriteBufferFromOwnString; ColumnInfo::ColumnInfo(Poco::JSON::Object::Ptr json) { deserialize(json); } diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index 88a52781d6b..a07e65cc89a 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -263,7 +263,7 @@ struct TableInfo ColumnID getColumnID(const String & name) const; - TableInfoPtr producePartitionTableInfo(TableID table_or_partition_id) const + TableInfo producePartitionTableInfo(TableID table_or_partition_id) const { // // Some sanity checks for partition table. @@ -280,44 +280,15 @@ struct TableInfo DB::ErrorCodes::LOGICAL_ERROR); // This is a TiDB partition table, adjust the table ID by making it to physical table ID (partition ID). - TableInfoPtr new_table = std::make_shared(*this); - new_table->belonging_table_id = id; - new_table->id = table_or_partition_id; + TableInfo new_table = *this; + new_table.belonging_table_id = id; + new_table.id = table_or_partition_id; // Mangle the table name by appending partition name. - new_table->name += "_" + std::to_string(table_or_partition_id); + new_table.name += "_" + std::to_string(table_or_partition_id); return new_table; } - - bool manglePartitionTableIfNeeded(TableID table_or_partition_id) - { - if (id == table_or_partition_id) - // Non-partition table. - return false; - - // Some sanity checks for partition table. - if (unlikely(!(is_partition_table && partition.enable))) - throw Exception("Table ID " + std::to_string(id) + " seeing partition ID " + std::to_string(table_or_partition_id) - + " but it's not a partition table", - DB::ErrorCodes::LOGICAL_ERROR); - - if (unlikely(std::find_if(partition.definitions.begin(), partition.definitions.end(), [table_or_partition_id](const auto & d) { - return d.id == table_or_partition_id; - }) == partition.definitions.end())) - throw Exception( - "Couldn't find partition with ID " + std::to_string(table_or_partition_id) + " in table ID " + std::to_string(id), - DB::ErrorCodes::LOGICAL_ERROR); - - // This is a TiDB partition table, adjust the table ID by making it to physical table ID (partition ID). - belonging_table_id = id; - id = table_or_partition_id; - - // Mangle the table name by appending partition name. - name += "_" + std::to_string(table_or_partition_id); - - return true; - } }; using DBInfoPtr = std::shared_ptr; diff --git a/dbms/src/Storages/Transaction/tests/table_info.cpp b/dbms/src/Storages/Transaction/tests/table_info.cpp index 66c6a7b1065..078f54bd102 100644 --- a/dbms/src/Storages/Transaction/tests/table_info.cpp +++ b/dbms/src/Storages/Transaction/tests/table_info.cpp @@ -29,8 +29,8 @@ struct Case void verifyTableInfo() const { DBInfo db_info(db_info_json); - TableInfo table_info1(table_info_json); - table_info1.manglePartitionTableIfNeeded(table_or_partition_id); + TableInfo tmp(table_info_json); + TableInfo table_info1 = tmp.producePartitionTableInfo(table_or_partition_id); auto json1 = table_info1.serialize(false); TableInfo table_info2(json1); auto json2 = table_info2.serialize(false); @@ -59,8 +59,7 @@ struct Case } }; -int main(int, char **) -try +int main(int, char **) try { auto cases = { Case{ @@ -113,6 +112,7 @@ try return 0; } -catch (const Poco::Exception & e) { - std::cout< Date: Sat, 20 Jul 2019 23:24:57 +0800 Subject: [PATCH 20/23] Refine schema sync on read, and add drop on read test --- dbms/src/Debug/MockSchemaSyncer.cpp | 30 ++++---- .../Interpreters/InterpreterSelectQuery.cpp | 75 ++++++++++++------- .../src/Interpreters/InterpreterSelectQuery.h | 3 +- dbms/src/Interpreters/Settings.h | 2 +- .../mutable-test/txn_schema/drop_on_read.test | 26 +++++++ 5 files changed, 92 insertions(+), 44 deletions(-) create mode 100644 tests/mutable-test/txn_schema/drop_on_read.test diff --git a/dbms/src/Debug/MockSchemaSyncer.cpp b/dbms/src/Debug/MockSchemaSyncer.cpp index 6fc68cea01e..13261c2e21c 100644 --- a/dbms/src/Debug/MockSchemaSyncer.cpp +++ b/dbms/src/Debug/MockSchemaSyncer.cpp @@ -328,24 +328,22 @@ void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table) /// Table existing, detect schema changes and apply. const TableInfo & orig_table_info = storage->getTableInfo(); AlterCommands alter_commands = detectSchemaChanges(table_info, orig_table_info); - - std::stringstream ss; - ss << "Detected schema changes: "; - for (const auto & command : alter_commands) - { - // TODO: Other command types. - if (command.type == AlterCommand::ADD_COLUMN) - ss << "ADD COLUMN " << command.column_name << " " << command.data_type->getName() << ", "; - else if (command.type == AlterCommand::DROP_COLUMN) - ss << "DROP COLUMN " << command.column_name << ", "; - else if (command.type == AlterCommand::MODIFY_COLUMN) - ss << "MODIFY COLUMN " << command.column_name << " " << command.data_type->getName() << ", "; - } - - LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": " << ss.str()); - if (!alter_commands.empty()) { + std::stringstream ss; + ss << "Detected schema changes: "; + for (const auto & command : alter_commands) + { + // TODO: Other command types. + if (command.type == AlterCommand::ADD_COLUMN) + ss << "ADD COLUMN " << command.column_name << " " << command.data_type->getName() << ", "; + else if (command.type == AlterCommand::DROP_COLUMN) + ss << "DROP COLUMN " << command.column_name << ", "; + else if (command.type == AlterCommand::MODIFY_COLUMN) + ss << "MODIFY COLUMN " << command.column_name << " " << command.data_type->getName() << ", "; + } + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": " << ss.str()); + // Call storage alter to apply schema changes. storage->alterForTMT(alter_commands, table_info, table->table_info.db_name, context); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index bc618c0a92f..6cc503d8342 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -150,6 +150,7 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names) { /// Read from table function. storage = context.getQueryContext().executeTableFunction(table_expression); + table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); } else { @@ -159,12 +160,17 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names) getDatabaseAndTableNames(database_name, table_name); - storage = context.getTable(database_name, table_name); + if (settings.schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION) + { + storage = context.getTable(database_name, table_name); + table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); + } + else + { + getAndLockStorageWithSchemaVersion(database_name, table_name, settings.schema_version); + } } - if (storage) - table_lock = alignStorageSchemaAndLock(settings.schema_version); - query_analyzer = std::make_unique( query_ptr, context, storage, source_columns, required_result_column_names, subquery_depth, !only_analyze); @@ -187,47 +193,64 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names) } -TableStructureReadLockPtr InterpreterSelectQuery::alignStorageSchemaAndLock(Int64 schema_version) +void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 schema_version) { - /// Regular read lock for non-TMT or schema version unspecified. - const auto merge_tree = dynamic_cast(storage.get()); - if (schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION || !merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn) - return storage->lockStructure(false, __PRETTY_FUNCTION__); + String qualified_name = database_name + "." + table_name; + + /// Lambda for get storage, then align schema version under the read lock. + auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple { + /// Get storage in case it's dropped then re-created. + // If schema synced, call getTable without try, leading to exception on table not existing. + auto storage_ = schema_synced ? context.getTable(database_name, table_name) : context.tryGetTable(database_name, table_name); + if (!storage_) + return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false); + + const auto merge_tree = dynamic_cast(storage_.get()); + if (!merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn) + throw Exception("Specifying schema_version for non-TMT storage: " + storage->getName() + ", table: " + qualified_name + " is not allowed", ErrorCodes::LOGICAL_ERROR); - /// Lambda for schema version check under the read lock. - auto checkSchemaVersionAndLock = [&](bool schema_synced) -> std::tuple { - auto lock = storage->lockStructure(false, __PRETTY_FUNCTION__); + /// Lock storage. + auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__); + /// Check schema version. auto storage_schema_version = merge_tree->getTableInfo().schema_version; if (storage_schema_version > schema_version) - throw Exception("Storage schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR); + throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR); if ((schema_synced && storage_schema_version <= schema_version) || (!schema_synced && storage_schema_version == schema_version)) - return std::make_tuple(lock, storage_schema_version); + return std::make_tuple(storage_, lock, storage_schema_version, true); - return std::make_tuple(nullptr, storage_schema_version); + return std::make_tuple(nullptr, nullptr, storage_schema_version, false); }; - /// Try check and lock once. + /// Try get storage and lock once. + StoragePtr storage_; + TableStructureReadLockPtr lock; + Int64 storage_schema_version; + bool ok; { - auto [lock, storage_schema_version] = checkSchemaVersionAndLock(false); - if (lock) + std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(false); + if (ok) { - LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check OK, no syncing required."); - return lock; + LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK, no syncing required."); + storage = storage_; + table_lock = lock; + return; } - LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check not OK."); } - /// If first try failed, sync schema and check again. + /// If first try failed, sync schema and try again. { + LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", not OK, syncing schemas."); context.getTMTContext().getSchemaSyncer()->syncSchemas(context); - auto [lock, storage_schema_version] = checkSchemaVersionAndLock(true); - if (lock) + std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(true); + if (ok) { - LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check OK after syncing."); - return lock; + LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK after syncing."); + storage = storage_; + table_lock = lock; + return; } throw Exception("Shouldn't reach here", ErrorCodes::UNKNOWN_EXCEPTION); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index c0b007690d0..7e5541542bb 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace Poco { class Logger; } @@ -111,7 +112,7 @@ class InterpreterSelectQuery : public IInterpreter void init(const Names & required_result_column_names); - TableStructureReadLockPtr alignStorageSchemaAndLock(Int64 schema_version); + void getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 schema_version); void executeImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index c67999e6916..8c80ed0f990 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -27,7 +27,7 @@ struct Settings #define APPLY_FOR_SETTINGS(M) \ M(SettingString, regions, "", "the region need to be read.") \ - M(SettingBool, resolve_locks, false, "tmt read tso.") \ + M(SettingBool, resolve_locks, false, "tmt resolve locks.") \ M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \ M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \ M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \ diff --git a/tests/mutable-test/txn_schema/drop_on_read.test b/tests/mutable-test/txn_schema/drop_on_read.test new file mode 100644 index 00000000000..f0786c80dcd --- /dev/null +++ b/tests/mutable-test/txn_schema/drop_on_read.test @@ -0,0 +1,26 @@ +=> DBGInvoke __enable_schema_sync_service('false') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) +=> DBGInvoke __mock_schema_syncer('true') + +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __drop_tidb_table(default, test, 'false') +=> select * from default.test +=> select * from default.test " --schema_version "100 +Received exception from server (version {#WORD}): +Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. + +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)') +=> select * from default.test +Received exception from server (version {#WORD}): +Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. +=> select * from default.test " --schema_version "100 + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test +=> DBGInvoke __enable_schema_sync_service('true') From 11f261d64cfd77d4b648198dbaa84b072210e587 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Mon, 22 Jul 2019 18:23:27 +0800 Subject: [PATCH 21/23] Support rename mock tidb table --- dbms/src/Debug/DBGInvoker.cpp | 1 + dbms/src/Debug/MockTiDB.cpp | 17 +++++++++++++++++ dbms/src/Debug/MockTiDB.h | 2 ++ dbms/src/Debug/dbgFuncMockTiDBTable.cpp | 16 ++++++++++++++++ dbms/src/Debug/dbgFuncMockTiDBTable.h | 5 +++++ 5 files changed, 41 insertions(+) diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index ce80ba28ab1..7351cecff47 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -40,6 +40,7 @@ DBGInvoker::DBGInvoker() regFunc("add_column_to_tidb_table", MockTiDBTable::dbgFuncAddColumnToTiDBTable); regFunc("drop_column_from_tidb_table", MockTiDBTable::dbgFuncDropColumnFromTiDBTable); regFunc("modify_column_in_tidb_table", MockTiDBTable::dbgFuncModifyColumnInTiDBTable); + regFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable); regFunc("set_flush_threshold", dbgFuncSetFlushThreshold); diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index 6f2d4928840..d44e335d451 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -221,6 +221,23 @@ void MockTiDB::modifyColumnInTable(const String & database_name, const String & it->tp = column_info.tp; } +void MockTiDB::renameTable(const String & database_name, const String & table_name, const String & new_table_name) +{ + std::lock_guard lock(tables_mutex); + + TablePtr table = getTableByNameInternal(database_name, table_name); + String qualified_name = database_name + "." + table_name; + String new_qualified_name = database_name + "." + new_table_name; + + TableInfo new_table_info = table->table_info; + new_table_info.name = new_table_name; + auto new_table = std::make_shared
(database_name, new_table_name, std::move(new_table_info)); + + tables_by_id[new_table->table_info.id] = new_table; + tables_by_name.erase(qualified_name); + tables_by_name.emplace(new_qualified_name, new_table); +} + TablePtr MockTiDB::getTableByName(const String & database_name, const String & table_name) { std::lock_guard lock(tables_mutex); diff --git a/dbms/src/Debug/MockTiDB.h b/dbms/src/Debug/MockTiDB.h index 5adc0dce5eb..d1b6decd976 100644 --- a/dbms/src/Debug/MockTiDB.h +++ b/dbms/src/Debug/MockTiDB.h @@ -74,6 +74,8 @@ class MockTiDB : public ext::singleton void modifyColumnInTable(const String & database_name, const String & table_name, const NameAndTypePair & column); + void renameTable(const String & database_name, const String & table_name, const String & new_table_name); + TablePtr getTableByName(const String & database_name, const String & table_name); void traverseTables(std::function f); diff --git a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp index efd17c4dfef..9d856d90bd5 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp +++ b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp @@ -230,4 +230,20 @@ void MockTiDBTable::dbgFuncModifyColumnInTiDBTable(DB::Context & context, const output(ss.str()); } +void MockTiDBTable::dbgFuncRenameTiDBTable(Context & /*context*/, const ASTs & args, DBGInvoker::Printer output) +{ + if (args.size() != 3) + throw Exception("Args not matched, should be: database-name, table-name, new-table-name", ErrorCodes::BAD_ARGUMENTS); + + const String & database_name = typeid_cast(*args[0]).name; + const String & table_name = typeid_cast(*args[1]).name; + const String & new_table_name = typeid_cast(*args[2]).name; + + MockTiDB::instance().renameTable(database_name, table_name, new_table_name); + + std::stringstream ss; + ss << "renamed table " << database_name << "." << table_name << " to " << database_name << "." << new_table_name; + output(ss.str()); +} + } // namespace DB diff --git a/dbms/src/Debug/dbgFuncMockTiDBTable.h b/dbms/src/Debug/dbgFuncMockTiDBTable.h index 27414b5582d..570e42656c3 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBTable.h +++ b/dbms/src/Debug/dbgFuncMockTiDBTable.h @@ -48,6 +48,11 @@ struct MockTiDBTable // Usage: // ./storages-client.sh "DBGInvoke modify_column_in_tidb_table(database_name, table_name, 'col type')" static void dbgFuncModifyColumnInTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output); + + // Rename a TiDB table. + // Usage: + // ./storages-client.sh "DBGInvoke rename_tidb_table(database_name, table_name, new_table)" + static void dbgFuncRenameTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output); }; } // namespace DB From a689b8a5c1fe1221ff37257035ec1a7189c56705 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Mon, 22 Jul 2019 18:53:15 +0800 Subject: [PATCH 22/23] Add rename tests --- .../txn_schema/rename_on_read.test | 25 ++++++++++++++++ .../txn_schema/rename_on_write.test | 30 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 tests/mutable-test/txn_schema/rename_on_read.test create mode 100644 tests/mutable-test/txn_schema/rename_on_write.test diff --git a/tests/mutable-test/txn_schema/rename_on_read.test b/tests/mutable-test/txn_schema/rename_on_read.test new file mode 100644 index 00000000000..ab1b1148c0d --- /dev/null +++ b/tests/mutable-test/txn_schema/rename_on_read.test @@ -0,0 +1,25 @@ +=> DBGInvoke __enable_schema_sync_service('false') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __drop_tidb_table(default, test1) +=> drop table if exists default.test1 + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) +=> DBGInvoke __mock_schema_syncer('true') + +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String') +=> DBGInvoke __refresh_schemas() +=> select * from default.test +=> DBGInvoke __rename_tidb_table(default, test, test1) +=> select * from default.test +=> select * from default.test " --schema_version "100 +Received exception from server (version {#WORD}): +Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. +=> select * from default.test1 +=> select * from default.test1 " --schema_version "100 + +=> DBGInvoke __drop_tidb_table(default, test1) +=> drop table if exists default.test1 +=> DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/mutable-test/txn_schema/rename_on_write.test b/tests/mutable-test/txn_schema/rename_on_write.test new file mode 100644 index 00000000000..3b031c45f09 --- /dev/null +++ b/tests/mutable-test/txn_schema/rename_on_write.test @@ -0,0 +1,30 @@ +=> DBGInvoke __enable_schema_sync_service('false') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __drop_tidb_table(default, test1) +=> drop table if exists default.test1 + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) +=> DBGInvoke __mock_schema_syncer('true') + +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> select col_1 from default.test +=> DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)') +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 1) +=> DBGInvoke __rename_tidb_table(default, test, test1) +=> DBGInvoke __try_flush_region(4) +=> select * from default.test +Received exception from server (version {#WORD}): +Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. +=> select * from default.test1 +┌─col_1─┬─_tidb_rowid─┬─col_2─┐ +│ test1 │ 50 │ 1 │ +└───────┴─────────────┴───────┘ + +=> DBGInvoke __drop_tidb_table(default, test1) +=> drop table if exists default.test1 +=> DBGInvoke __enable_schema_sync_service('true') From 00bd504d21220db0f9decfc8b9ce6d611b181b52 Mon Sep 17 00:00:00 2001 From: zanmato1984 Date: Mon, 22 Jul 2019 20:25:03 +0800 Subject: [PATCH 23/23] Add mock truncate table and tests --- dbms/src/Debug/DBGInvoker.cpp | 1 + dbms/src/Debug/MockTiDB.cpp | 13 ++++++++++ dbms/src/Debug/MockTiDB.h | 2 ++ dbms/src/Debug/dbgFuncMockTiDBTable.cpp | 15 +++++++++++ dbms/src/Debug/dbgFuncMockTiDBTable.h | 5 ++++ .../txn_schema/truncate_on_read.test | 26 +++++++++++++++++++ .../txn_schema/truncate_on_write.test | 25 ++++++++++++++++++ 7 files changed, 87 insertions(+) create mode 100644 tests/mutable-test/txn_schema/truncate_on_read.test create mode 100644 tests/mutable-test/txn_schema/truncate_on_write.test diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index 7351cecff47..230a9aff95b 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -41,6 +41,7 @@ DBGInvoker::DBGInvoker() regFunc("drop_column_from_tidb_table", MockTiDBTable::dbgFuncDropColumnFromTiDBTable); regFunc("modify_column_in_tidb_table", MockTiDBTable::dbgFuncModifyColumnInTiDBTable); regFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable); + regFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable); regFunc("set_flush_threshold", dbgFuncSetFlushThreshold); diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index d44e335d451..e6692ab59c1 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -238,6 +238,19 @@ void MockTiDB::renameTable(const String & database_name, const String & table_na tables_by_name.emplace(new_qualified_name, new_table); } +void MockTiDB::truncateTable(const String & database_name, const String & table_name) +{ + std::lock_guard lock(tables_mutex); + + TablePtr table = getTableByNameInternal(database_name, table_name); + + TableID old_table_id = table->table_info.id; + table->table_info.id += 1000; // Just big enough is OK. + + tables_by_id.erase(old_table_id); + tables_by_id.emplace(table->id(), table); +} + TablePtr MockTiDB::getTableByName(const String & database_name, const String & table_name) { std::lock_guard lock(tables_mutex); diff --git a/dbms/src/Debug/MockTiDB.h b/dbms/src/Debug/MockTiDB.h index d1b6decd976..f60c0fe8b59 100644 --- a/dbms/src/Debug/MockTiDB.h +++ b/dbms/src/Debug/MockTiDB.h @@ -76,6 +76,8 @@ class MockTiDB : public ext::singleton void renameTable(const String & database_name, const String & table_name, const String & new_table_name); + void truncateTable(const String & database_name, const String & table_name); + TablePtr getTableByName(const String & database_name, const String & table_name); void traverseTables(std::function f); diff --git a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp index 9d856d90bd5..2db846688b9 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp +++ b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp @@ -246,4 +246,19 @@ void MockTiDBTable::dbgFuncRenameTiDBTable(Context & /*context*/, const ASTs & a output(ss.str()); } +void MockTiDBTable::dbgFuncTruncateTiDBTable(Context & /*context*/, const ASTs & args, DBGInvoker::Printer output) +{ + if (args.size() != 2) + throw Exception("Args not matched, should be: database-name, table-name", ErrorCodes::BAD_ARGUMENTS); + + const String & database_name = typeid_cast(*args[0]).name; + const String & table_name = typeid_cast(*args[1]).name; + + MockTiDB::instance().truncateTable(database_name, table_name); + + std::stringstream ss; + ss << "truncated table " << database_name << "." << table_name; + output(ss.str()); +} + } // namespace DB diff --git a/dbms/src/Debug/dbgFuncMockTiDBTable.h b/dbms/src/Debug/dbgFuncMockTiDBTable.h index 570e42656c3..92b29a7589a 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBTable.h +++ b/dbms/src/Debug/dbgFuncMockTiDBTable.h @@ -53,6 +53,11 @@ struct MockTiDBTable // Usage: // ./storages-client.sh "DBGInvoke rename_tidb_table(database_name, table_name, new_table)" static void dbgFuncRenameTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output); + + // Truncate a TiDB table. + // Usage: + // ./storages-client.sh "DBGInvoke truncate_tidb_table(database_name, table_name)" + static void dbgFuncTruncateTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output); }; } // namespace DB diff --git a/tests/mutable-test/txn_schema/truncate_on_read.test b/tests/mutable-test/txn_schema/truncate_on_read.test new file mode 100644 index 00000000000..9aa469e118b --- /dev/null +++ b/tests/mutable-test/txn_schema/truncate_on_read.test @@ -0,0 +1,26 @@ +=> DBGInvoke __enable_schema_sync_service('false') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) +=> DBGInvoke __mock_schema_syncer('true') + +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1') +=> select * from default.test +┌─col_1─┬─_tidb_rowid─┐ +│ test1 │ 50 │ +└───────┴─────────────┘ +=> DBGInvoke __truncate_tidb_table(default, test) +=> select * from default.test +┌─col_1─┬─_tidb_rowid─┐ +│ test1 │ 50 │ +└───────┴─────────────┘ +=> select * from default.test " --schema_version "100 + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test +=> DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/mutable-test/txn_schema/truncate_on_write.test b/tests/mutable-test/txn_schema/truncate_on_write.test new file mode 100644 index 00000000000..62cb594151b --- /dev/null +++ b/tests/mutable-test/txn_schema/truncate_on_write.test @@ -0,0 +1,25 @@ +=> DBGInvoke __enable_schema_sync_service('false') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) +=> DBGInvoke __mock_schema_syncer('true') + +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1') +=> select col_1 from default.test +┌─col_1─┐ +│ test1 │ +└───────┘ +=> DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)') +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test1', 1) +=> DBGInvoke __truncate_tidb_table(default, test) +=> DBGInvoke __try_flush_region(4) +=> select * from default.test + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test +=> DBGInvoke __enable_schema_sync_service('true')