diff --git a/dbms/src/Databases/test/gtest_database.cpp b/dbms/src/Databases/test/gtest_database.cpp index 6b8bbc17348..b469a571570 100644 --- a/dbms/src/Databases/test/gtest_database.cpp +++ b/dbms/src/Databases/test/gtest_database.cpp @@ -273,18 +273,15 @@ try EXPECT_EQ(managed_storage->getDatabaseName(), db_name); } - const String to_tbl_name = "t_112"; + const String to_tbl_display_name = "tbl_test"; { // Rename table - typeid_cast(db.get())->renameTable(ctx, tbl_name, *db, to_tbl_name, db_name, to_tbl_name); + typeid_cast(db.get())->renameTable(ctx, tbl_name, *db, tbl_name, db_name, to_tbl_display_name); - auto old_storage = db->tryGetTable(ctx, tbl_name); - ASSERT_EQ(old_storage, nullptr); - - auto storage = db->tryGetTable(ctx, to_tbl_name); + auto storage = db->tryGetTable(ctx, tbl_name); ASSERT_NE(storage, nullptr); EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name); - EXPECT_EQ(storage->getTableName(), to_tbl_name); + EXPECT_EQ(storage->getTableName(), tbl_name); auto managed_storage = std::dynamic_pointer_cast(storage); EXPECT_EQ(managed_storage->getDatabaseName(), db_name); @@ -294,13 +291,13 @@ try // Drop table auto drop_query = std::make_shared(); drop_query->database = db_name; - drop_query->table = to_tbl_name; + drop_query->table = tbl_name; drop_query->if_exists = false; ASTPtr ast_drop_query = drop_query; InterpreterDropQuery drop_interpreter(ast_drop_query, ctx); drop_interpreter.execute(); - auto storage = db->tryGetTable(ctx, to_tbl_name); + auto storage = db->tryGetTable(ctx, tbl_name); ASSERT_EQ(storage, nullptr); } @@ -391,18 +388,18 @@ try EXPECT_EQ(managed_storage->getDatabaseName(), db_name); } - const String to_tbl_name = "t_112"; + const String to_tbl_display_name = "tbl_test"; { // Rename table - typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name); + typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, tbl_name, db2_name, to_tbl_display_name); auto old_storage = db->tryGetTable(ctx, tbl_name); ASSERT_EQ(old_storage, nullptr); - auto storage = db2->tryGetTable(ctx, to_tbl_name); + auto storage = db2->tryGetTable(ctx, tbl_name); ASSERT_NE(storage, nullptr); EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name); - EXPECT_EQ(storage->getTableName(), to_tbl_name); + EXPECT_EQ(storage->getTableName(), tbl_name); auto managed_storage = std::dynamic_pointer_cast(storage); EXPECT_EQ(managed_storage->getDatabaseName(), db2_name); @@ -412,13 +409,13 @@ try // Drop table auto drop_query = std::make_shared(); drop_query->database = db2_name; - drop_query->table = to_tbl_name; + drop_query->table = tbl_name; drop_query->if_exists = false; ASTPtr ast_drop_query = drop_query; InterpreterDropQuery drop_interpreter(ast_drop_query, ctx); drop_interpreter.execute(); - auto storage = db2->tryGetTable(ctx, to_tbl_name); + auto storage = db2->tryGetTable(ctx, tbl_name); ASSERT_EQ(storage, nullptr); } @@ -501,18 +498,17 @@ try EXPECT_FALSE(db->empty(ctx)); EXPECT_TRUE(db->isTableExist(ctx, tbl_name)); - const String to_tbl_name = "t_112"; // Rename table to another database, and mock crash by failed point FailPointHelper::enableFailPoint(FailPoints::exception_before_rename_table_old_meta_removed); ASSERT_THROW( - typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name), + typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, tbl_name, db2_name, tbl_name), DB::Exception); { // After fail point triggled we should have both meta file in disk Poco::File old_meta_file{db->getTableMetadataPath(tbl_name)}; ASSERT_TRUE(old_meta_file.exists()); - Poco::File new_meta_file(db2->getTableMetadataPath(to_tbl_name)); + Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name)); ASSERT_TRUE(new_meta_file.exists()); // Old table should remain in db auto old_storage = db->tryGetTable(ctx, tbl_name); @@ -527,10 +523,10 @@ try ThreadPool thread_pool(2); db2->loadTables(ctx, &thread_pool, true); - Poco::File new_meta_file(db2->getTableMetadataPath(to_tbl_name)); + Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name)); ASSERT_FALSE(new_meta_file.exists()); - auto storage = db2->tryGetTable(ctx, to_tbl_name); + auto storage = db2->tryGetTable(ctx, tbl_name); ASSERT_EQ(storage, nullptr); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 523960e52a6..83e81b6017c 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -21,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +43,7 @@ #include #include +#include namespace ProfileEvents { @@ -196,7 +199,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, size_t rowkey_column_size_, const Settings & settings_) : global_context(db_context.getGlobalContext()) - , path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)) + , path_pool(std::make_shared(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name))) , settings(settings_) , db_name(db_name_) , table_name(table_name_) @@ -216,7 +219,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, storage_pool = std::make_shared(global_context, ns_id, - path_pool, + *path_pool, db_name_ + "." + table_name_); // Restore existing dm files and set capacity for path_pool. @@ -296,25 +299,46 @@ DeltaMergeStore::~DeltaMergeStore() void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) { + // Callbacks for cleaning outdated DTFiles. Note that there is a chance + // that callbacks is called after the `DeltaMergeStore` dropped, we must + // make the callbacks safe. ExternalPageCallbacks callbacks; - // V2 callbacks for cleaning DTFiles - callbacks.scanner = [this]() { + callbacks.ns_id = storage_pool->getNamespaceId(); + callbacks.scanner = [path_pool_weak_ref = std::weak_ptr(path_pool), file_provider = global_context.getFileProvider()]() { ExternalPageCallbacks::PathAndIdsVec path_and_ids_vec; - auto delegate = path_pool.getStableDiskDelegator(); + + // If the StoragePathPool is invalid, meaning we call `scanner` after dropping the table, + // simply return an empty list is OK. + auto path_pool = path_pool_weak_ref.lock(); + if (!path_pool) + return path_and_ids_vec; + + // Return the DTFiles on disks. + auto delegate = path_pool->getStableDiskDelegator(); + // Only return the DTFiles can be GC. The page id of not able to be GC files, which is being ingested or in the middle of + // SegmentSplit/Merge/MergeDelta, is not yet applied + // to PageStorage is marked as not able to be GC, so we don't return them and run the `remover` DMFile::ListOptions options; options.only_list_can_gc = true; for (auto & root_path : delegate.listPaths()) { - auto & path_and_ids = path_and_ids_vec.emplace_back(); - path_and_ids.first = root_path; - auto file_ids_in_current_path = DMFile::listAllInPath(global_context.getFileProvider(), root_path, options); - for (auto id : file_ids_in_current_path) - path_and_ids.second.insert(id); + std::set ids_under_path; + auto file_ids_in_current_path = DMFile::listAllInPath(file_provider, root_path, options); + path_and_ids_vec.emplace_back(root_path, std::move(file_ids_in_current_path)); } return path_and_ids_vec; }; - callbacks.remover = [this](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set & valid_ids) { - auto delegate = path_pool.getStableDiskDelegator(); + callbacks.remover = [path_pool_weak_ref = std::weak_ptr(path_pool), // + file_provider = global_context.getFileProvider(), + logger = log](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set & valid_ids) { + // If the StoragePathPool is invalid, meaning we call `remover` after dropping the table, + // simply skip is OK. + auto path_pool = path_pool_weak_ref.lock(); + if (!path_pool) + return; + + SYNC_FOR("before_DeltaMergeStore::callbacks_remover_remove"); + auto delegate = path_pool->getStableDiskDelegator(); for (const auto & [path, ids] : path_and_ids_vec) { for (auto id : ids) @@ -323,18 +347,50 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) continue; // Note that page_id is useless here. - auto dmfile = DMFile::restore(global_context.getFileProvider(), id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none()); - if (dmfile->canGC()) + auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none()); + if (unlikely(!dmfile)) { - delegate.removeDTFile(dmfile->fileId()); - dmfile->remove(global_context.getFileProvider()); + // If the dtfile directory is not exist, it means `StoragePathPool::drop` have been + // called in another thread. Just try to clean if any id is left. + try + { + delegate.removeDTFile(id); + } + catch (DB::Exception & e) + { + // just ignore + } + LOG_FMT_INFO(logger, + "GC try remove useless DM file, but file not found and may have been removed, dmfile={}", + DMFile::getPathByStatus(path, id, DMFile::Status::READABLE)); + } + else if (dmfile->canGC()) + { + // StoragePathPool::drop may be called concurrently, ignore and continue next file if any exception thrown + String err_msg; + try + { + // scanner should only return dtfiles that can GC, + // just another check here. + delegate.removeDTFile(dmfile->fileId()); + dmfile->remove(file_provider); + } + catch (DB::Exception & e) + { + err_msg = e.message(); + } + catch (Poco::Exception & e) + { + err_msg = e.message(); + } + if (err_msg.empty()) + LOG_FMT_INFO(logger, "GC removed useless DM file, dmfile={}", dmfile->path()); + else + LOG_FMT_INFO(logger, "GC try remove useless DM file, but error happen, dmfile={}, err_msg={}", dmfile->path(), err_msg); } - - LOG_FMT_INFO(log, "GC removed useless dmfile: {}", dmfile->path()); } } }; - callbacks.ns_id = storage_pool->getNamespaceId(); // remember to unregister it when shutdown storage_pool->dataRegisterExternalPagesCallbacks(callbacks); storage_pool->enableGC(); @@ -355,20 +411,9 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) blockable_background_pool_handle->wake(); } -void DeltaMergeStore::rename(String /*new_path*/, bool clean_rename, String new_database_name, String new_table_name) +void DeltaMergeStore::rename(String /*new_path*/, String new_database_name, String new_table_name) { - if (clean_rename) - { - path_pool.rename(new_database_name, new_table_name, clean_rename); - } - else - { - LOG_FMT_WARNING(log, "Applying heavy renaming for table {}.{} to {}.{}", db_name, table_name, new_database_name, new_table_name); - - // Remove all background task first - shutdown(); - path_pool.rename(new_database_name, new_table_name, clean_rename); // rename for multi-disk - } + path_pool->rename(new_database_name, new_table_name); // TODO: replacing these two variables is not atomic, but could be good enough? table_name.swap(new_table_name); @@ -465,7 +510,7 @@ void DeltaMergeStore::drop() storage_pool->drop(); // Drop data in storage path pool - path_pool.drop(/*recursive=*/true, /*must_success=*/false); + path_pool->drop(/*recursive=*/true, /*must_success=*/false); LOG_FMT_INFO(log, "Drop DeltaMerge done [{}.{}]", db_name, table_name); } @@ -496,7 +541,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB: // Because db_context could be a temporary object and won't last long enough during the query process. // Like the context created by InterpreterSelectWithUnionQuery. auto * ctx = new DMContext(db_context.getGlobalContext(), - path_pool, + *path_pool, *storage_pool, latest_gc_safe_point.load(std::memory_order_acquire), settings.not_compress_columns, @@ -704,7 +749,7 @@ std::tuple DeltaMergeStore::preAllocateIngestFile() if (shutdown_called.load(std::memory_order_relaxed)) return {}; - auto delegator = path_pool.getStableDiskDelegator(); + auto delegator = path_pool->getStableDiskDelegator(); auto parent_path = delegator.choosePath(); auto new_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); return {parent_path, new_id}; @@ -715,7 +760,7 @@ void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId fil if (shutdown_called.load(std::memory_order_relaxed)) return; - auto delegator = path_pool.getStableDiskDelegator(); + auto delegator = path_pool->getStableDiskDelegator(); delegator.addDTFile(file_id, file_size, parent_path); } @@ -2529,7 +2574,7 @@ void DeltaMergeStore::restoreStableFiles() options.only_list_can_gc = false; // We need all files to restore the bytes on disk options.clean_up = true; auto file_provider = global_context.getFileProvider(); - auto path_delegate = path_pool.getStableDiskDelegator(); + auto path_delegate = path_pool->getStableDiskDelegator(); for (const auto & root_path : path_delegate.listPaths()) { for (const auto & file_id : DMFile::listAllInPath(file_provider, root_path, options)) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 769786ce070..3dc9fca4f08 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -314,7 +314,7 @@ class DeltaMergeStore : private boost::noncopyable return table_name; } - void rename(String new_path, bool clean_rename, String new_database_name, String new_table_name); + void rename(String new_path, String new_database_name, String new_table_name); void clearData(); @@ -529,7 +529,7 @@ class DeltaMergeStore : private boost::noncopyable #endif Context & global_context; - StoragePathPool path_pool; + std::shared_ptr path_pool; Settings settings; StoragePoolPtr storage_pool; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index fe984ad519f..e05d98a15bd 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -151,7 +151,12 @@ DMFilePtr DMFile::restore( const ReadMetaMode & read_meta_mode) { String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE); - bool single_file_mode = Poco::File(path).isFile(); + // The path may be dropped by another thread in some cases + auto poco_file = Poco::File(path); + if (!poco_file.exists()) + return nullptr; + + bool single_file_mode = poco_file.isFile(); DMFilePtr dmfile(new DMFile( file_id, page_id, diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index c9ba3943a60..fbe6ab681d8 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -135,7 +135,7 @@ FileUsageStatistics GlobalStoragePool::getLogFileUsage() const bool GlobalStoragePool::gc() { - return gc(global_context.getSettingsRef(), true, DELTA_MERGE_GC_PERIOD); + return gc(global_context.getSettingsRef(), /*immediately=*/true, DELTA_MERGE_GC_PERIOD); } bool GlobalStoragePool::gc(const Settings & settings, bool immediately, const Seconds & try_gc_period) @@ -445,7 +445,7 @@ PageStorageRunMode StoragePool::restore() } else { - LOG_FMT_INFO(logger, "Current pool.meta translate already done before restored [ns_id={}] ", ns_id); + LOG_FMT_INFO(logger, "Current pool.meta transform already done before restored [ns_id={}] ", ns_id); } if (const auto & data_remain_pages = data_storage_v2->getNumberOfPages(); data_remain_pages != 0) @@ -461,7 +461,7 @@ PageStorageRunMode StoragePool::restore() } else { - LOG_FMT_INFO(logger, "Current pool.data translate already done before restored [ns_id={}]", ns_id); + LOG_FMT_INFO(logger, "Current pool.data transform already done before restored [ns_id={}]", ns_id); } // Check number of valid pages in v2 diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 471d59f760c..f3d1daa739a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -17,19 +17,26 @@ #include #include #include +#include #include +#include #include #include +#include #include #include #include #include +#include +#include #include #include #include #include +#include "Storages/DeltaMerge/RowKeyRange.h" + namespace DB { namespace FailPoints @@ -84,8 +91,126 @@ try // check column structure of store const auto & cols = store->getTableColumns(); // version & tag column added - ASSERT_EQ(cols.size(), 3UL); + ASSERT_EQ(cols.size(), 3); + } +} +CATCH + +TEST_F(DeltaMergeStoreTest, DroppedInMiddleDTFileGC) +try +{ + // create table + ASSERT_NE(store, nullptr); + + auto global_page_storage = TiFlashTestEnv::getGlobalContext().getGlobalStoragePool(); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + if (global_page_storage) + global_page_storage->gc(); + }); + sp_gc.waitAndPause(); + + { + // check column structure of store + const auto & cols = store->getTableColumns(); + // version & tag column added + ASSERT_EQ(cols.size(), 3); } + + // drop table in the middle of page storage gc + store->shutdown(); + store = nullptr; + + sp_gc.next(); // continue the page storage gc + th_gc.wait(); +} +CATCH + +TEST_F(DeltaMergeStoreTest, DroppedInMiddleDTFileRemoveCallback) +try +{ + // create table + ASSERT_NE(store, nullptr); + + auto global_page_storage = TiFlashTestEnv::getGlobalContext().getGlobalStoragePool(); + + // Start a PageStorage gc and suspend it before removing dtfiles + auto sp_gc = SyncPointCtl::enableInScope("before_DeltaMergeStore::callbacks_remover_remove"); + auto th_gc = std::async([&]() { + if (global_page_storage) + global_page_storage->gc(); + }); + sp_gc.waitAndPause(); + + { + // check column structure of store + const auto & cols = store->getTableColumns(); + // version & tag column added + ASSERT_EQ(cols.size(), 3); + } + + // drop table and files in the middle of page storage gc + store->drop(); + store = nullptr; + + sp_gc.next(); // continue removing dtfiles + th_gc.wait(); +} +CATCH + +TEST_F(DeltaMergeStoreTest, CreateInMiddleDTFileGC) +try +{ + // create table + ASSERT_NE(store, nullptr); + + auto global_page_storage = TiFlashTestEnv::getGlobalContext().getGlobalStoragePool(); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + if (global_page_storage) + global_page_storage->gc(); + }); + sp_gc.waitAndPause(); + + DeltaMergeStorePtr new_store; + ColumnDefinesPtr new_cols; + { + new_cols = DMTestEnv::getDefaultColumns(); + ColumnDefine handle_column_define = (*new_cols)[0]; + new_store = std::make_shared(*db_context, + false, + "test", + "t_200", + 200, + *new_cols, + handle_column_define, + false, + 1, + DeltaMergeStore::Settings()); + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 100, false); + new_store->write(*db_context, db_context->getSettingsRef(), block); + new_store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + } + + sp_gc.next(); // continue the page storage gc + th_gc.wait(); + + BlockInputStreamPtr in = new_store->read(*db_context, + db_context->getSettingsRef(), + *new_cols, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + "", + /* keep_order= */ false, + /* is_fast_mode= */ false, + /* expected_block_size= */ 1024)[0]; + ASSERT_INPUTSTREAM_NROWS(in, 100); } CATCH diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h index 5f8c098a8b1..a34a5125e28 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h @@ -67,7 +67,7 @@ class DeltaMergeStoreTest : public DB::base::TiFlashStorageTestBasic DeltaMergeStorePtr s = std::make_shared(*db_context, false, "test", - "DeltaMergeStoreTest", + "t_100", 100, *cols, handle_column_define, @@ -135,7 +135,7 @@ class DeltaMergeStoreRWTest DeltaMergeStorePtr s = std::make_shared(*db_context, false, "test", - "DeltaMergeStoreRWTest", + "t_101", 101, *cols, handle_column_define, @@ -180,4 +180,4 @@ class DeltaMergeStoreRWTest }; } // namespace tests } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index ac03b509f18..4406cf06289 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -207,10 +207,12 @@ try // Rename database name before store object is created. const String new_db_name = "new_" + storage->getDatabaseName(); - storage->rename(path_name, new_db_name, table_name, table_name); + const String new_display_table_name = "new_" + storage->getTableName(); + storage->rename(path_name, new_db_name, table_name, new_display_table_name); ASSERT_FALSE(storage->storeInited()); ASSERT_EQ(storage->getTableName(), table_name); ASSERT_EQ(storage->getDatabaseName(), new_db_name); + ASSERT_EQ(storage->getTableInfo().name, new_display_table_name); // prepare block data Block sample; @@ -231,9 +233,8 @@ try } // TiFlash always use t_{table_id} as table name - String new_table_name = storage->getTableName(); - storage->rename(path_name, new_db_name, new_table_name, new_table_name); - ASSERT_EQ(storage->getTableName(), new_table_name); + storage->rename(path_name, new_db_name, table_name, table_name); + ASSERT_EQ(storage->getTableName(), table_name); ASSERT_EQ(storage->getDatabaseName(), new_db_name); storage->drop(); diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index b77656785a7..7468c4fde44 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -68,13 +68,13 @@ enum class PageStorageRunMode : UInt8 struct ExternalPageCallbacks { - // `scanner` for scanning avaliable external page ids on disks. + // `scanner` for scanning available external page ids on disks. // `remover` will be called with living normal page ids after gc run a round, user should remove those // external pages(files) in `pending_external_pages` but not in `valid_normal_pages` using PathAndIdsVec = std::vector>>; using ExternalPagesScanner = std::function; using ExternalPagesRemover - = std::function & valid_normal_pages)>; + = std::function & valid_normal_pages)>; ExternalPagesScanner scanner = nullptr; ExternalPagesRemover remover = nullptr; NamespaceId ns_id = MAX_NAMESPACE_ID; @@ -336,6 +336,7 @@ class PageStorage : private boost::noncopyable } // Register and unregister external pages GC callbacks + // Note that user must ensure that it is safe to call `scanner` and `remover` even after unregister. virtual void registerExternalPagesCallbacks(const ExternalPageCallbacks & callbacks) = 0; virtual void unregisterExternalPagesCallbacks(NamespaceId /*ns_id*/){}; diff --git a/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h b/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h index 26ff9c109f4..51c118e9172 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h +++ b/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h @@ -17,6 +17,10 @@ #include #include +#include +#include +#include +#include #include namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index dba1fef7566..67611dc980b 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -25,6 +26,8 @@ #include #include +#include + namespace DB { namespace ErrorCodes @@ -336,24 +339,55 @@ bool PageStorageImpl::gcImpl(bool /*not_skip*/, const WriteLimiterPtr & write_li // TODO: `clean_external_page` for all tables may slow down the whole gc process when there are lots of table. void PageStorageImpl::cleanExternalPage(Stopwatch & gc_watch, GCTimeStatistics & statistics) { - // TODO: `callbacks_mutex` is being held during the whole `cleanExternalPage`, meaning gc will block - // creating/dropping table, need to refine it later. - std::scoped_lock lock{callbacks_mutex}; - statistics.num_external_callbacks = callbacks_container.size(); - if (!callbacks_container.empty()) + // Fine grained lock on `callbacks_mutex`. + // So that adding/removing a storage will not be blocked for the whole + // processing time of `cleanExternalPage`. + std::shared_ptr ns_callbacks; + { + std::scoped_lock lock{callbacks_mutex}; + // check and get the begin iter + statistics.num_external_callbacks = callbacks_container.size(); + auto iter = callbacks_container.begin(); + if (iter == callbacks_container.end()) // empty + { + statistics.clean_external_page_ms = gc_watch.elapsedMillisecondsFromLastTime(); + return; + } + + assert(iter != callbacks_container.end()); // early exit in the previous code + // keep the shared_ptr so that erasing ns_id from PageStorage won't invalid the `ns_callbacks` + ns_callbacks = iter->second; + } + + Stopwatch external_watch; + + SYNC_FOR("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + + while (true) { - Stopwatch external_watch; - for (const auto & [ns_id, callbacks] : callbacks_container) + // 1. Note that we must call `scanner` before `getAliveExternalIds`. + // Or some committed external ids is not included in `alive_ids` + // but exist in `pending_external_pages`. They will be removed by + // accident with `remover` under this situation. + // 2. Assume calling the callbacks after erasing ns_is is safe. + + // the external pages on disks. + auto pending_external_pages = ns_callbacks->scanner(); + statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); + auto alive_external_ids = page_directory->getAliveExternalIds(ns_callbacks->ns_id); + statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); + // remove the external pages that is not alive now. + ns_callbacks->remover(pending_external_pages, alive_external_ids); + statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); + + // move to next namespace callbacks { - // Note that we must call `scanner` before `getAliveExternalIds` - // Or some committed external ids is not included and we may - // remove the external page by accident with `remover`. - const auto pending_external_pages = callbacks.scanner(); - statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); - const auto alive_external_ids = page_directory->getAliveExternalIds(ns_id); - statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); - callbacks.remover(pending_external_pages, alive_external_ids); - statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); + std::scoped_lock lock{callbacks_mutex}; + // next ns_id that is greater than `ns_id` + auto iter = callbacks_container.upper_bound(ns_callbacks->ns_id); + if (iter == callbacks_container.end()) + break; + ns_callbacks = iter->second; } } @@ -445,8 +479,13 @@ void PageStorageImpl::registerExternalPagesCallbacks(const ExternalPageCallbacks assert(callbacks.scanner != nullptr); assert(callbacks.remover != nullptr); assert(callbacks.ns_id != MAX_NAMESPACE_ID); - assert(callbacks_container.count(callbacks.ns_id) == 0); - callbacks_container.emplace(callbacks.ns_id, callbacks); + // NamespaceId(TableID) should not be reuse + RUNTIME_CHECK_MSG( + callbacks_container.count(callbacks.ns_id) == 0, + "Try to create callbacks for duplicated namespace id {}", + callbacks.ns_id); + // `emplace` won't invalid other iterator + callbacks_container.emplace(callbacks.ns_id, std::make_shared(callbacks)); } void PageStorageImpl::unregisterExternalPagesCallbacks(NamespaceId ns_id) diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.h b/dbms/src/Storages/Page/V3/PageStorageImpl.h index 9bce2e5dde8..321c9742f66 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.h +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.h @@ -15,6 +15,8 @@ #pragma once #include +#include +#include #include #include #include @@ -172,7 +174,8 @@ class PageStorageImpl : public DB::PageStorage const static String manifests_file_name; std::mutex callbacks_mutex; - using ExternalPageCallbacksContainer = std::unordered_map; + // Only std::map not std::unordered_map. We need insert/erase do not invalid other iterators. + using ExternalPageCallbacksContainer = std::map>; ExternalPageCallbacksContainer callbacks_container; }; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 0ccdd1d9b2c..0c86fbb52ec 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -13,6 +13,7 @@ // limitations under the License. +#include #include #include #include @@ -35,6 +36,9 @@ #include #include +#include +#include + namespace DB { namespace FailPoints @@ -1238,6 +1242,104 @@ try } CATCH +TEST_F(PageStorageTest, ConcurrencyAddExtCallbacks) +try +{ + auto ptr = std::make_shared(100); // mock the `StorageDeltaMerge` + ExternalPageCallbacks callbacks; + callbacks.ns_id = TEST_NAMESPACE_ID; + callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return {}; + + (*ptr) += 1; // mock access the storage inside callback + return {}; + }; + callbacks.remover = [ptr_weak_ref = std::weak_ptr(ptr)](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return; + + (*ptr) += 1; // mock access the storage inside callback + }; + page_storage->registerExternalPagesCallbacks(callbacks); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + page_storage->gcImpl(/*not_skip*/ true, nullptr, nullptr); + }); + sp_gc.waitAndPause(); + + // mock table created while gc is running + { + ExternalPageCallbacks new_callbacks; + new_callbacks.ns_id = TEST_NAMESPACE_ID + 1; + new_callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return {}; + + (*ptr) += 1; // mock access the storage inside callback + return {}; + }; + new_callbacks.remover = [ptr_weak_ref = std::weak_ptr(ptr)](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return; + + (*ptr) += 1; // mock access the storage inside callback + }; + page_storage->registerExternalPagesCallbacks(new_callbacks); + } + + sp_gc.next(); // continue the gc + th_gc.wait(); + + ASSERT_EQ(*ptr, 100 + 4); +} +CATCH + +TEST_F(PageStorageTest, ConcurrencyRemoveExtCallbacks) +try +{ + auto ptr = std::make_shared(100); // mock the `StorageDeltaMerge` + ExternalPageCallbacks callbacks; + callbacks.ns_id = TEST_NAMESPACE_ID; + callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return {}; + + (*ptr) += 1; // mock access the storage inside callback + return {}; + }; + callbacks.remover = [ptr_weak_ref = std::weak_ptr(ptr)](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return; + + (*ptr) += 1; // mock access the storage inside callback + }; + page_storage->registerExternalPagesCallbacks(callbacks); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + page_storage->gcImpl(/*not_skip*/ true, nullptr, nullptr); + }); + sp_gc.waitAndPause(); + + // mock table dropped while gc is running + page_storage->unregisterExternalPagesCallbacks(TEST_NAMESPACE_ID); + ptr = nullptr; + + sp_gc.next(); // continue the gc + th_gc.wait(); +} +CATCH + TEST_F(PageStorageTest, GcReuseSpaceThenRestore) try { diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index 2e7edd7435b..465874564f1 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -228,53 +228,15 @@ void StoragePathPool::clearPSV2ObsoleteData() drop_instance_data("data"); } -void StoragePathPool::rename(const String & new_database, const String & new_table, bool clean_rename) +void StoragePathPool::rename(const String & new_database, const String & new_table) { - if (unlikely(new_database.empty() || new_table.empty())) - throw Exception(fmt::format("Can not rename for PathPool to {}.{}", new_database, new_table)); + RUNTIME_CHECK(!new_database.empty() && !new_table.empty(), new_database, new_table); + RUNTIME_CHECK(!path_need_database_name); - if (likely(clean_rename)) - { - // caller ensure that no path need to be renamed. - if (unlikely(path_need_database_name)) - throw Exception("Can not do clean rename with path_need_database_name is true!"); - - std::lock_guard lock{mutex}; - database = new_database; - table = new_table; - } - else - { - if (unlikely(file_provider->isEncryptionEnabled())) - throw Exception("Encryption is only supported when using clean_rename"); - - // Note: changing these path is not atomic, we may lost data if process is crash here. - std::lock_guard lock{mutex}; - // Get root path without database and table - for (auto & info : main_path_infos) - { - Poco::Path p(info.path); - p = p.parent().parent(); - if (path_need_database_name) - p = p.parent(); - auto new_path = getStorePath(p.toString() + "/data", new_database, new_table); - renamePath(info.path, new_path); - info.path = new_path; - } - for (auto & info : latest_path_infos) - { - Poco::Path p(info.path); - p = p.parent().parent(); - if (path_need_database_name) - p = p.parent(); - auto new_path = getStorePath(p.toString() + "/data", new_database, new_table); - renamePath(info.path, new_path); - info.path = new_path; - } - - database = new_database; - table = new_table; - } + // The directories for storing table data is not changed, just rename related names. + std::lock_guard lock{mutex}; + database = new_database; + table = new_table; } void StoragePathPool::drop(bool recursive, bool must_success) @@ -297,6 +259,8 @@ void StoragePathPool::drop(bool recursive, bool must_success) total_bytes += file_size; } global_capacity->freeUsedSize(path_info.path, total_bytes); + // clear in case delegator->removeDTFile is called after `drop` + dt_file_path_map.clear(); } } catch (Poco::DirectoryNotEmptyException & e) diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index 4f16511feff..59d8f463e4b 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -412,7 +412,7 @@ class StoragePathPool void clearPSV2ObsoleteData(); - void rename(const String & new_database, const String & new_table, bool clean_rename); + void rename(const String & new_database, const String & new_table); void drop(bool recursive, bool must_success = true); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 6eb6a16736b..8c9f115a472 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1195,62 +1195,31 @@ void StorageDeltaMerge::rename( const String & new_display_table_name) { tidb_table_info.name = new_display_table_name; // update name in table info - // For DatabaseTiFlash, simply update store's database is OK. - // `store->getTableName() == new_table_name` only keep for mock test. - bool clean_rename = !data_path_contains_database_name && getTableName() == new_table_name; - if (likely(clean_rename)) { - if (storeInited()) - { - _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); - return; - } - std::lock_guard lock(store_mutex); - if (storeInited()) - { - _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); - } - else - { - table_column_info->db_name = new_database_name; - table_column_info->table_name = new_table_name; - } + // For DatabaseTiFlash, simply update store's database is OK. + // `store->getTableName() == new_table_name` only keep for mock test. + bool clean_rename = !data_path_contains_database_name && getTableName() == new_table_name; + RUNTIME_ASSERT(clean_rename, + log, + "should never rename the directories when renaming table, new_database_name={}, new_table_name={}", + new_database_name, + new_table_name); + } + if (storeInited()) + { + _store->rename(new_path_to_db, new_database_name, new_table_name); return; } - - /// Note that this routine is only left for CI tests. `clean_rename` should always be true in production env. - auto & store = getAndMaybeInitStore(); - - // For DatabaseOrdinary, we need to rename data path, then recreate a new store. - const String new_path = new_path_to_db + "/" + new_table_name; - - if (Poco::File{new_path}.exists()) - throw Exception( - fmt::format("Target path already exists: {}", new_path), - ErrorCodes::DIRECTORY_ALREADY_EXISTS); - - // flush store and then reset store to new path - store->flushCache(global_context, RowKeyRange::newAll(is_common_handle, rowkey_column_size)); - ColumnDefines table_column_defines = store->getTableColumns(); - ColumnDefine handle_column_define = store->getHandle(); - DeltaMergeStore::Settings settings = store->getSettings(); - - // remove background tasks - store->shutdown(); - // rename directories for multi disks - store->rename(new_path, clean_rename, new_database_name, new_table_name); - // generate a new store - store = std::make_shared( - global_context, - data_path_contains_database_name, - new_database_name, - new_table_name, - tidb_table_info.id, - std::move(table_column_defines), - std::move(handle_column_define), - is_common_handle, - rowkey_column_size, - settings); + std::lock_guard lock(store_mutex); + if (storeInited()) + { + _store->rename(new_path_to_db, new_database_name, new_table_name); + } + else + { + table_column_info->db_name = new_database_name; + table_column_info->table_name = new_table_name; + } } String StorageDeltaMerge::getTableName() const diff --git a/tests/delta-merge-test/ddl/alter.test b/tests/delta-merge-test/ddl/alter.test index 4bf405ac9e1..c058e5cdc81 100644 --- a/tests/delta-merge-test/ddl/alter.test +++ b/tests/delta-merge-test/ddl/alter.test @@ -72,21 +72,5 @@ └───┴──────┴───────┴──────┘ -## rename table ->> drop table if exists dm_test_renamed ->> rename table dm_test to dm_test_renamed ->> select * from dm_test -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.dm_test doesn't exist.. - ->> select * from dm_test_renamed -┌─a─┬────b─┬─────c─┬────d─┐ -│ 1 │ 0 │ 0 │ \N │ -│ 2 │ 1024 │ 65535 │ 4096 │ -│ 3 │ 2048 │ 65536 │ \N │ -└───┴──────┴───────┴──────┘ - - ## Clean up >> drop table if exists dm_test ->> drop table if exists dm_test_renamed diff --git a/tests/delta-merge-test/raft/schema/partition_table_restart.test b/tests/delta-merge-test/raft/schema/partition_table_restart.test index 893bb617af4..c7a5e488111 100644 --- a/tests/delta-merge-test/raft/schema/partition_table_restart.test +++ b/tests/delta-merge-test/raft/schema/partition_table_restart.test @@ -15,16 +15,11 @@ => DBGInvoke __enable_schema_sync_service('false') => DBGInvoke __drop_tidb_table(default, test) -=> DBGInvoke __drop_tidb_table(default, test1) => DBGInvoke __refresh_schemas() => drop table if exists default.test => drop table if exists default.test_9999 => drop table if exists default.test_9998 => drop table if exists default.test_9997 -=> drop table if exists default.test1 -=> drop table if exists default.test1_9999 -=> drop table if exists default.test1_9998 -=> drop table if exists default.test1_9997 => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64', '', 'dt') => DBGInvoke __mock_tidb_partition(default, test, 9999) @@ -38,38 +33,23 @@ => DBGInvoke __reset_schemas() => DBGInvoke __add_column_to_tidb_table(default, test, 'col_3 Nullable(Int8)') -=> DBGInvoke __rename_tidb_table(default, test, test1) => DBGInvoke __refresh_schemas() - -=> show tables -┌─name───────┐ -│ test1 │ -│ test1_9997 │ -│ test1_9998 │ -│ test1_9999 │ -└────────────┘ -=> select col_2 from default.test1_9997 -=> select * from default.test_9997 -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test_9997 doesn't exist.. -=> select * from default.test_9998 -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test_9998 doesn't exist.. +=> select col_2 from default.test_9997 => DBGInvoke __reset_schemas() -=> DBGInvoke __drop_tidb_partition(default, test1, 9997) +=> DBGInvoke __drop_tidb_partition(default, test, 9997) => DBGInvoke __refresh_schemas() -=> DBGInvoke is_tombstone(default, test1_9997) -┌─is_tombstone(default, test1_9997)─┐ +=> DBGInvoke is_tombstone(default, test_9997) +┌─is_tombstone(default, test_9997)─┐ │ true │ └───────────────────────────────────┘ -=> select * from default.test1_9999 +=> select * from default.test_9999 -=> DBGInvoke __drop_tidb_table(default, test1) +=> DBGInvoke __drop_tidb_table(default, test) => DBGInvoke __refresh_schemas() -=> drop table if exists default.test1 -=> drop table if exists default.test1_9999 -=> drop table if exists default.test1_9998 -=> drop table if exists default.test1_9997 +=> drop table if exists default.test +=> drop table if exists default.test_9999 +=> drop table if exists default.test_9998 +=> drop table if exists default.test_9997 => DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/delta-merge-test/raft/schema/rename_on_read.test b/tests/delta-merge-test/raft/schema/rename_on_read.test deleted file mode 100644 index 40eb66277a9..00000000000 --- a/tests/delta-merge-test/raft/schema/rename_on_read.test +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2022 PingCAP, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -=> DBGInvoke __enable_schema_sync_service('false') - -=> DBGInvoke __drop_tidb_table(default, test) -=> drop table if exists default.test - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 - -=> DBGInvoke __set_flush_threshold(1000000, 1000000) - -=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String', '', 'dt') -=> DBGInvoke __refresh_schemas() -=> select * from default.test -=> DBGInvoke __rename_tidb_table(default, test, test1) -=> select * from default.test -=> select * from default.test " --schema_version "1000000 -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. -=> select * from default.test1 -=> select * from default.test1 " --schema_version "1000000 - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 -=> DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/delta-merge-test/raft/schema/rename_on_write.test b/tests/delta-merge-test/raft/schema/rename_on_write.test deleted file mode 100644 index 6ad2c809ce6..00000000000 --- a/tests/delta-merge-test/raft/schema/rename_on_write.test +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2022 PingCAP, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -#TODO: We can not mock this situation, ignore for now -#RETURN - -=> DBGInvoke __enable_schema_sync_service('false') - -=> DBGInvoke __drop_tidb_table(default, test) -=> drop table if exists default.test - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 - -=> DBGInvoke __set_flush_threshold(1000000, 1000000) - -=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String', '', 'dt') -=> DBGInvoke __refresh_schemas() -=> DBGInvoke __put_region(4, 0, 100, default, test) -=> select col_1 from default.test -=> DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)') -=> DBGInvoke __rename_tidb_table(default, test, test1) -#For DeltaTree, each write will trigger schema sync. -=> DBGInvoke __raft_insert_row(default, test1, 4, 50, 'test1', 1) -=> select * from default.test -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. -=> select * from default.test1 -┌─col_1─┬─_tidb_rowid─┬─col_2─┐ -│ test1 │ 50 │ 1 │ -└───────┴─────────────┴───────┘ - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 -=> DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/delta-merge-test/raft/schema/rename_tables.test b/tests/delta-merge-test/raft/schema/rename_tables.test deleted file mode 100644 index 7c65d46d3e3..00000000000 --- a/tests/delta-merge-test/raft/schema/rename_tables.test +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2022 PingCAP, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Preparation. -=> DBGInvoke __enable_schema_sync_service('false') - -=> DBGInvoke __drop_tidb_table(default, t1) -=> DBGInvoke __drop_tidb_table(default, t2) -=> drop table if exists default.t1 -=> drop table if exists default.t2 -=> DBGInvoke __refresh_schemas() - -=> DBGInvoke __set_flush_threshold(1000000, 1000000) - - -=> DBGInvoke __create_tidb_tables(default, t1, t2) -# rename table -=> DBGInvoke __rename_tidb_tables(default, t1, r1, default, t2, r2) -=> DBGInvoke __refresh_schemas() -=> select database,name,engine from system.tables where database='default' and name='r1' -┌─database─┬─name─┬─engine─────┐ -│ default │ r1 │ DeltaMerge │ -└──────────┴──────┴────────────┘ -=> select database,name,engine from system.tables where database='default' and name='r2' -┌─database─┬─name─┬─engine─────┐ -│ default │ r2 │ DeltaMerge │ -└──────────┴──────┴────────────┘ - -# clean -=> DBGInvoke __drop_tidb_table(default, r1) -=> DBGInvoke __drop_tidb_table(default, r2) -=> drop table if exists default.r1 -=> drop table if exists default.r2 -=> DBGInvoke __enable_schema_sync_service('true') \ No newline at end of file diff --git a/tests/delta-merge-test/raft/txn_mock/partition_table.test b/tests/delta-merge-test/raft/txn_mock/partition_table.test index 2f8e67a61a8..84b8044260f 100644 --- a/tests/delta-merge-test/raft/txn_mock/partition_table.test +++ b/tests/delta-merge-test/raft/txn_mock/partition_table.test @@ -17,12 +17,9 @@ => DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test -=> drop table if exists default.test1 => drop table if exists default.test_9997 => drop table if exists default.test_9998 => drop table if exists default.test_9999 -=> drop table if exists default.test1_9997 -=> drop table if exists default.test1_9999 => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') => DBGInvoke __mock_tidb_partition(default, test, 9999) @@ -94,27 +91,17 @@ │ 0 │ └──────────────┘ -=> DBGInvoke __rename_tidb_table(default, test, test1) -=> DBGInvoke __refresh_schemas() -=> select count(*) from default.test1_9997 -┌─count()─┐ -│ 2 │ -└─────────┘ - -=> DBGInvoke __drop_tidb_table(default, test1) +=> DBGInvoke __drop_tidb_table(default, test) => DBGInvoke __refresh_schemas() -=> DBGInvoke is_tombstone(default, test1_9999) +=> DBGInvoke is_tombstone(default, test_9999) ┌─is_tombstone(default, test_9999)─┐ │ true │ └──────────────────────────────────┘ => drop table if exists default.test -=> drop table if exists default.test1 => drop table if exists default.test_9997 => drop table if exists default.test_9998 => drop table if exists default.test_9999 -=> drop table if exists default.test1_9997 -=> drop table if exists default.test1_9999 => DBGInvoke __enable_schema_sync_service('true') => DBGInvoke __clean_up_region()