diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 26e950d7798..5d967b388d0 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -162,7 +162,7 @@ struct ContextShared PathCapacityMetricsPtr path_capacity_ptr; /// Path capacity metrics FileProviderPtr file_provider; /// File provider. IORateLimiter io_rate_limiter; - PageStorageRunMode storage_run_mode; + PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3; DM::GlobalStoragePoolPtr global_storage_pool; /// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests. @@ -714,7 +714,7 @@ Dependencies Context::getDependencies(const String & database_name, const String checkDatabaseAccessRightsImpl(db); } - ViewDependencies::const_iterator iter = shared->view_dependencies.find(DatabaseAndTableName(db, table_name)); + auto iter = shared->view_dependencies.find(DatabaseAndTableName(db, table_name)); if (iter == shared->view_dependencies.end()) return {}; @@ -728,7 +728,7 @@ bool Context::isTableExist(const String & database_name, const String & table_na String db = resolveDatabase(database_name, current_database); checkDatabaseAccessRightsImpl(db); - Databases::const_iterator it = shared->databases.find(db); + auto it = shared->databases.find(db); return shared->databases.end() != it && it->second->isTableExist(*this, table_name); } @@ -754,7 +754,7 @@ void Context::assertTableExists(const String & database_name, const String & tab String db = resolveDatabase(database_name, current_database); checkDatabaseAccessRightsImpl(db); - Databases::const_iterator it = shared->databases.find(db); + auto it = shared->databases.find(db); if (shared->databases.end() == it) throw Exception(fmt::format("Database {} doesn't exist", backQuoteIfNeed(db)), ErrorCodes::UNKNOWN_DATABASE); @@ -771,7 +771,7 @@ void Context::assertTableDoesntExist(const String & database_name, const String if (check_database_access_rights) checkDatabaseAccessRightsImpl(db); - Databases::const_iterator it = shared->databases.find(db); + auto it = shared->databases.find(db); if (shared->databases.end() != it && it->second->isTableExist(*this, table_name)) throw Exception(fmt::format("Table {}.{} already exists.", backQuoteIfNeed(db), backQuoteIfNeed(table_name)), ErrorCodes::TABLE_ALREADY_EXISTS); } @@ -826,7 +826,7 @@ Tables Context::getExternalTables() const StoragePtr Context::tryGetExternalTable(const String & table_name) const { - TableAndCreateASTs::const_iterator jt = external_tables.find(table_name); + auto jt = external_tables.find(table_name); if (external_tables.end() == jt) return StoragePtr(); @@ -864,7 +864,7 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta String db = resolveDatabase(database_name, current_database); checkDatabaseAccessRightsImpl(db); - Databases::const_iterator it = shared->databases.find(db); + auto it = shared->databases.find(db); if (shared->databases.end() == it) { if (exception) @@ -894,7 +894,7 @@ void Context::addExternalTable(const String & table_name, const StoragePtr & sto StoragePtr Context::tryRemoveExternalTable(const String & table_name) { - TableAndCreateASTs::const_iterator it = external_tables.find(table_name); + auto it = external_tables.find(table_name); if (external_tables.end() == it) return StoragePtr(); @@ -954,7 +954,7 @@ std::unique_ptr Context::getDDLGuardIfTableDoesntExist(const String & { auto lock = getLock(); - Databases::const_iterator it = shared->databases.find(database); + auto it = shared->databases.find(database); if (shared->databases.end() != it && it->second->isTableExist(*this, table)) return {}; @@ -993,7 +993,7 @@ ASTPtr Context::getCreateTableQuery(const String & database_name, const String & ASTPtr Context::getCreateExternalTableQuery(const String & table_name) const { - TableAndCreateASTs::const_iterator jt = external_tables.find(table_name); + auto jt = external_tables.find(table_name); if (external_tables.end() == jt) throw Exception(fmt::format("Temporary table {} doesn't exist", backQuoteIfNeed(table_name)), ErrorCodes::UNKNOWN_TABLE); @@ -1088,7 +1088,7 @@ void Context::setCurrentQueryId(const String & query_id) UInt64 a; UInt64 b; }; - } random; + } random{}; { auto lock = getLock(); @@ -1650,9 +1650,8 @@ bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool) auto lock = getLock(); if (shared->global_storage_pool) { - // Can't init GlobalStoragePool twice. - // otherwise the pagestorage instances in `StoragePool` for each table won't be updated and cause unexpected problem. - throw Exception("GlobalStoragePool has already been initialized.", ErrorCodes::LOGICAL_ERROR); + // GlobalStoragePool may be initialized many times in some test cases for restore. + LOG_WARNING(shared->log, "GlobalStoragePool has already been initialized."); } CurrentMetrics::set(CurrentMetrics::GlobalStorageRunMode, static_cast(shared->storage_run_mode)); if (shared->storage_run_mode == PageStorageRunMode::MIX_MODE || shared->storage_run_mode == PageStorageRunMode::ONLY_V3) diff --git a/dbms/src/Server/tests/gtest_server_config.cpp b/dbms/src/Server/tests/gtest_server_config.cpp index 53705f1a351..cf53a8d6c18 100644 --- a/dbms/src/Server/tests/gtest_server_config.cpp +++ b/dbms/src/Server/tests/gtest_server_config.cpp @@ -371,10 +371,10 @@ dt_page_gc_low_write_prob = 0.2 std::unique_ptr path_pool = std::make_unique(global_ctx.getPathPool().withTable("test", "t1", false)); std::unique_ptr storage_pool = std::make_unique(global_ctx, /*ns_id*/ 100, *path_pool, "test.t1"); - auto verify_storage_pool_reload_config = [&global_ctx](std::unique_ptr & storage_pool) { + auto verify_storage_pool_reload_config = [&](std::unique_ptr & storage_pool) { DB::Settings & settings = global_ctx.getSettingsRef(); - auto cfg = storage_pool->data_storage_v2->getSettings(); + auto cfg = storage_pool->dataWriter()->getSettings(); EXPECT_NE(cfg.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num); EXPECT_NE(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num); EXPECT_NE(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes); @@ -384,9 +384,9 @@ dt_page_gc_low_write_prob = 0.2 EXPECT_NE(cfg.open_file_max_idle_time, settings.dt_open_file_max_idle_seconds); EXPECT_NE(cfg.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000); - storage_pool->gc(settings, DM::StoragePool::Seconds(0)); + global_ctx.getGlobalStoragePool()->gc(); - cfg = storage_pool->data_storage_v2->getSettings(); + cfg = storage_pool->dataWriter()->getSettings(); EXPECT_EQ(cfg.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num); EXPECT_EQ(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num); EXPECT_EQ(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 1d0e00a5b58..35e6c3d00c6 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -50,6 +50,7 @@ extern const char force_triggle_foreground_flush[]; extern const char force_set_segment_ingest_packs_fail[]; extern const char segment_merge_after_ingest_packs[]; extern const char force_set_segment_physical_split[]; +extern const char force_set_page_file_write_errno[]; } // namespace FailPoints namespace DM @@ -495,6 +496,198 @@ try } CATCH +TEST_P(DeltaMergeStoreRWTest, WriteCrashBeforeWalWithoutCache) +try +{ + const ColumnDefine col_str_define(2, "col2", std::make_shared()); + const ColumnDefine col_i8_define(3, "i8", std::make_shared()); + { + auto table_column_defines = DMTestEnv::getDefaultColumns(); + table_column_defines->emplace_back(col_str_define); + table_column_defines->emplace_back(col_i8_define); + + store = reload(table_column_defines); + } + + { + // check column structure + const auto & cols = store->getTableColumns(); + ASSERT_EQ(cols.size(), 5UL); + const auto & str_col = cols[3]; + ASSERT_EQ(str_col.name, col_str_define.name); + ASSERT_EQ(str_col.id, col_str_define.id); + ASSERT_TRUE(str_col.type->equals(*col_str_define.type)); + const auto & i8_col = cols[4]; + ASSERT_EQ(i8_col.name, col_i8_define.name); + ASSERT_EQ(i8_col.id, col_i8_define.id); + ASSERT_TRUE(i8_col.type->equals(*col_i8_define.type)); + } + + const size_t num_rows_write = 128; + { + // write to store + Block block; + { + block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + // Add a column of col2:String for test + block.insert(DB::tests::createColumn( + createNumberStrings(0, num_rows_write), + col_str_define.name, + col_str_define.id)); + // Add a column of i8:Int8 for test + block.insert(DB::tests::createColumn( + createSignedNumbers(0, num_rows_write), + col_i8_define.name, + col_i8_define.id)); + } + db_context->getSettingsRef().dt_segment_delta_cache_limit_rows = 8; + FailPointHelper::enableFailPoint(FailPoints::force_set_page_file_write_errno); + ASSERT_THROW(store->write(*db_context, db_context->getSettingsRef(), block), DB::Exception); + try + { + store->write(*db_context, db_context->getSettingsRef(), block); + } + catch (DB::Exception & e) + { + if (e.code() != ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR) + throw; + } + } + FailPointHelper::disableFailPoint(FailPoints::force_set_page_file_write_errno); + + { + // read all columns from store + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*db_context, + db_context->getSettingsRef(), + columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + TRACING_NAME, + /* expected_block_size= */ 1024)[0]; + + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + } + in->readSuffix(); + ASSERT_EQ(num_rows_read, 0); + } +} +CATCH + +TEST_P(DeltaMergeStoreRWTest, WriteCrashBeforeWalWithCache) +try +{ + const ColumnDefine col_str_define(2, "col2", std::make_shared()); + const ColumnDefine col_i8_define(3, "i8", std::make_shared()); + { + auto table_column_defines = DMTestEnv::getDefaultColumns(); + table_column_defines->emplace_back(col_str_define); + table_column_defines->emplace_back(col_i8_define); + + store = reload(table_column_defines); + } + + { + // check column structure + const auto & cols = store->getTableColumns(); + ASSERT_EQ(cols.size(), 5UL); + const auto & str_col = cols[3]; + ASSERT_EQ(str_col.name, col_str_define.name); + ASSERT_EQ(str_col.id, col_str_define.id); + ASSERT_TRUE(str_col.type->equals(*col_str_define.type)); + const auto & i8_col = cols[4]; + ASSERT_EQ(i8_col.name, col_i8_define.name); + ASSERT_EQ(i8_col.id, col_i8_define.id); + ASSERT_TRUE(i8_col.type->equals(*col_i8_define.type)); + } + + const size_t num_rows_write = 128; + { + // write to store + Block block; + { + block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + // Add a column of col2:String for test + block.insert(DB::tests::createColumn( + createNumberStrings(0, num_rows_write), + col_str_define.name, + col_str_define.id)); + // Add a column of i8:Int8 for test + block.insert(DB::tests::createColumn( + createSignedNumbers(0, num_rows_write), + col_i8_define.name, + col_i8_define.id)); + } + + FailPointHelper::enableFailPoint(FailPoints::force_set_page_file_write_errno); + store->write(*db_context, db_context->getSettingsRef(), block); + ASSERT_THROW(store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())), DB::Exception); + try + { + store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + } + catch (DB::Exception & e) + { + if (e.code() != ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR) + throw; + } + } + FailPointHelper::disableFailPoint(FailPoints::force_set_page_file_write_errno); + + { + // read all columns from store + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*db_context, + db_context->getSettingsRef(), + columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + TRACING_NAME, + /* expected_block_size= */ 1024)[0]; + + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == DMTestEnv::pk_name) + { + //printf("pk:%lld\n", c->getInt(i)); + EXPECT_EQ(c->getInt(i), i); + } + else if (iter.name == col_str_define.name) + { + //printf("%s:%s\n", col_str_define.name.c_str(), c->getDataAt(i).data); + EXPECT_EQ(c->getDataAt(i), DB::toString(i)); + } + else if (iter.name == col_i8_define.name) + { + //printf("%s:%lld\n", col_i8_define.name.c_str(), c->getInt(i)); + Int64 num = i * (i % 2 == 0 ? -1 : 1); + EXPECT_EQ(c->getInt(i), num); + } + } + } + } + in->readSuffix(); + ASSERT_EQ(num_rows_read, num_rows_write); + } +} +CATCH + TEST_P(DeltaMergeStoreRWTest, DeleteRead) try { diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 78d800dc3af..cec6e297d0e 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -407,7 +407,9 @@ class PageWriter : private boost::noncopyable // Only used for DATA transform data void writeIntoV3(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const; +#ifndef DBMS_PUBLIC_GTEST private: +#endif void writeIntoMixMode(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const; // A wrap of getSettings only used for `RegionPersister::gc` diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp index 98d84989dd9..f7ce3180172 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp @@ -44,17 +44,16 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic storage_path_pool_v3 = std::make_unique(Strings{path}, Strings{path}, Strings{}, std::make_shared(0, paths, caps, Strings{}, caps), global_context.getFileProvider(), true); global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE); - if (!global_context.getGlobalStoragePool()) - global_context.initializeGlobalStoragePoolIfNeed(*storage_path_pool_v3); } void SetUp() override { + auto & global_context = DB::tests::TiFlashTestEnv::getGlobalContext(); + global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE); TiFlashStorageTestBasic::SetUp(); const auto & path = getTemporaryPath(); createIfNotExist(path); - auto & global_context = DB::tests::TiFlashTestEnv::getGlobalContext(); std::vector caps = {}; Strings paths = {path}; @@ -76,7 +75,7 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic PageStorageRunMode reloadMixedStoragePool() { - DB::tests::TiFlashTestEnv::getContext().setPageStorageRunMode(PageStorageRunMode::MIX_MODE); + db_context->setPageStorageRunMode(PageStorageRunMode::MIX_MODE); PageStorageRunMode run_mode = storage_pool_mix->restore(); page_writer_mix = storage_pool_mix->logWriter(); page_reader_mix = storage_pool_mix->logReader(); @@ -85,7 +84,7 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic void reloadV2StoragePool() { - DB::tests::TiFlashTestEnv::getContext().setPageStorageRunMode(PageStorageRunMode::ONLY_V2); + db_context->setPageStorageRunMode(PageStorageRunMode::ONLY_V2); storage_pool_v2->restore(); page_writer_v2 = storage_pool_v2->logWriter(); page_reader_v2 = storage_pool_v2->logReader(); diff --git a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp index 16a35f42da1..963e3a3571d 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp @@ -65,6 +65,11 @@ class RegionPersister_test : public ::testing::Test String dir_path; DB::Timestamp tso = 0; + + String getPageStorageV3MetaPath(String & path) + { + return path + "/page/kvstore/wal/log_1_0"; + } }; static ::testing::AssertionResult PeerCompare( @@ -251,7 +256,7 @@ try } // If we truncate page data file, exception will throw instead of droping last region. - auto meta_path = path + "/kvstore/page_1_0/meta"; // First page + auto meta_path = getPageStorageV3MetaPath(path); // First page Poco::File meta_file(meta_path); size_t size = meta_file.getSize(); int rt = ::truncate(meta_path.c_str(), size - 1); // Remove last one byte @@ -288,9 +293,13 @@ try { std::string path = dir_path + "/compatible_mode"; + auto current_storage_run_mode = TiFlashTestEnv::getGlobalContext().getPageStorageRunMode(); // Force to run in compatible mode for the default region persister FailPointHelper::enableFailPoint(FailPoints::force_enable_region_persister_compatible_mode); - SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_enable_region_persister_compatible_mode); }); + SCOPE_EXIT( + { FailPointHelper::disableFailPoint(FailPoints::force_enable_region_persister_compatible_mode); + TiFlashTestEnv::getGlobalContext().setPageStorageRunMode(current_storage_run_mode); }); + TiFlashTestEnv::getGlobalContext().setPageStorageRunMode(PageStorageRunMode::ONLY_V2); auto ctx = TiFlashTestEnv::getContext(DB::Settings(), Strings{ path, diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index 34355d43775..264fd6009a3 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -94,6 +94,7 @@ Context TiFlashTestEnv::getContext(const DB::Settings & settings, Strings testda context.setPath(root_path); auto paths = getPathPool(testdata_path); context.setPathPool(paths.first, paths.second, Strings{}, true, context.getPathCapacity(), context.getFileProvider()); + global_context->initializeGlobalStoragePoolIfNeed(context.getPathPool()); context.getSettingsRef() = settings; return context; } diff --git a/dbms/src/TestUtils/TiFlashTestEnv.h b/dbms/src/TestUtils/TiFlashTestEnv.h index 65dad63d937..0264d87ef9f 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.h +++ b/dbms/src/TestUtils/TiFlashTestEnv.h @@ -88,7 +88,7 @@ class TiFlashTestEnv static Context getContext(const DB::Settings & settings = DB::Settings(), Strings testdata_path = {}); - static void initializeGlobalContext(Strings testdata_path = {}, bool enable_ps_v3 = false); + static void initializeGlobalContext(Strings testdata_path = {}, bool enable_ps_v3 = true); static Context & getGlobalContext() { return *global_context; } static void shutdown();