From 3e0e6799022789e84458ce72a79526419342a876 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 10 Sep 2021 13:44:39 +0800 Subject: [PATCH] Fix duplicated ID DTFile that cause inconsistent query result (#2770) (#2774) --- .gitignore | 10 +++ dbms/CMakeLists.txt | 1 + dbms/src/Common/FailPoint.cpp | 3 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 1 - dbms/src/Storages/DeltaMerge/Segment.cpp | 22 +++--- dbms/src/Storages/DeltaMerge/StoragePool.cpp | 50 +++++++++++-- dbms/src/Storages/DeltaMerge/StoragePool.h | 6 +- .../DeltaMerge/tests/dm_basic_include.h | 3 +- dbms/src/Storages/PathPool.cpp | 20 ++++-- dbms/src/Storages/PathPool.h | 4 +- dbms/src/TestUtils/TiFlashTestBasic.cpp | 71 +++++++++++++++++++ dbms/src/TestUtils/TiFlashTestBasic.h | 10 +-- dbms/src/TestUtils/gtests_dbms_main.cpp | 68 ++---------------- 13 files changed, 170 insertions(+), 99 deletions(-) create mode 100644 dbms/src/TestUtils/TiFlashTestBasic.cpp diff --git a/.gitignore b/.gitignore index a886242b529..8978259f8fc 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,16 @@ *\# .tramp_history +# vscode clangd cache +.cache + +# JSON Compilation Database Format Specification +# https://clang.llvm.org/docs/JSONCompilationDatabase.html +compile_commands.json + +# git patch reject report +*.rej + # vim cache files *.swp diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 9c1dfc73959..44e989d2c14 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -294,6 +294,7 @@ if (ENABLE_TESTS) ${ClickHouse_SOURCE_DIR}/dbms/src/Server/StorageConfigParser.cpp ${ClickHouse_SOURCE_DIR}/dbms/src/Server/UserConfigParser.cpp ${ClickHouse_SOURCE_DIR}/dbms/src/Server/RaftConfigParser.cpp + ${ClickHouse_SOURCE_DIR}/dbms/src/TestUtils/TiFlashTestBasic.cpp ) target_include_directories(gtests_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR}) target_link_libraries(gtests_dbms gtest_main dbms clickhouse_functions) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 7601314e9d1..0f17111cd03 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -58,7 +58,8 @@ std::unordered_map> FailPointHelper::f M(random_slow_page_storage_remove_expired_snapshots) \ M(random_slow_page_storage_list_all_live_files) \ M(force_set_safepoint_when_decode_block) \ - M(force_set_page_data_compact_batch) + M(force_set_page_data_compact_batch) \ + M(force_set_dtfile_exist_when_acquire_id) #define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \ M(pause_after_learner_read) \ diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index f17f385d45c..56133b85e3a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -510,7 +510,6 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ checkSegmentUpdate(dm_context, segment, ThreadType::Write); } - void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings & db_settings, const HandleRange & delete_range) { LOG_INFO(log, "Write into " << db_name << "." << table_name << " delete range " << delete_range.toDebugString()); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 2c50edcebef..7fe8d373f14 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -116,17 +116,17 @@ StableValueSpacePtr createNewStable(DMContext & context, PageId stable_id, WriteBatches & wbs) { - auto delegate = context.path_pool.getStableDiskDelegator(); - auto store_path = delegate.choosePath(); + auto delegator = context.path_pool.getStableDiskDelegator(); + auto store_path = delegator.choosePath(); - PageId dmfile_id = context.storage_pool.newDataPageId(); - auto dmfile = writeIntoNewDMFile(context, schema_snap, input_stream, dmfile_id, store_path); + PageId dtfile_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto dmfile = writeIntoNewDMFile(context, schema_snap, input_stream, dtfile_id, store_path); auto stable = std::make_shared(stable_id); stable->setFiles({dmfile}); stable->saveMeta(wbs.meta); - wbs.data.putExternal(dmfile_id, 0); - delegate.addDTFile(dmfile_id, dmfile->getBytesOnDisk(), store_path); - + wbs.data.putExternal(dtfile_id, 0); + delegator.addDTFile(dtfile_id, dmfile->getBytesOnDisk(), store_path); + return stable; } @@ -771,15 +771,15 @@ std::optional Segment::prepareSplitLogical(DMContext & dm_co DMFiles my_stable_files; DMFiles other_stable_files; - auto delegate = dm_context.path_pool.getStableDiskDelegator(); + auto delegator = dm_context.path_pool.getStableDiskDelegator(); for (auto & dmfile : segment_snap->stable->getDMFiles()) { auto ori_ref_id = dmfile->refId(); auto file_id = segment_snap->delta->storage_snap->data_reader.getNormalPageId(ori_ref_id); - auto file_parent_path = delegate.getDTFilePath(file_id); + auto file_parent_path = delegator.getDTFilePath(file_id); - auto my_dmfile_id = storage_pool.newDataPageId(); - auto other_dmfile_id = storage_pool.newDataPageId(); + auto my_dmfile_id = storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto other_dmfile_id = storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); wbs.data.putRefPage(my_dmfile_id, file_id); wbs.data.putRefPage(other_dmfile_id, file_id); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index 83a80790b60..2735a8ea2ad 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -1,3 +1,5 @@ +#include +#include #include #include #include @@ -6,6 +8,10 @@ namespace DB { +namespace FailPoints +{ +extern const char force_set_dtfile_exist_when_acquire_id[]; +} // namespace FailPoints namespace DM { enum class StorageType @@ -66,7 +72,8 @@ StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const global_ctx.getTiFlashMetrics()), max_log_page_id(0), max_data_page_id(0), - max_meta_page_id(0) + max_meta_page_id(0), + global_context(global_ctx) { } @@ -88,6 +95,37 @@ void StoragePool::drop() log_storage.drop(); } +PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who) +{ + // In case that there is a DTFile created on disk but TiFlash crashes without persisting the ID. + // After TiFlash process restored, the ID will be inserted into the stable delegator, but we may + // get a duplicated ID from the `storage_pool.data`. (tics#2756) + PageId dtfile_id; + do + { + dtfile_id = ++max_data_page_id; + + auto existed_path = delegator.getDTFilePath(dtfile_id, /*throw_on_not_exist=*/false); + fiu_do_on(FailPoints::force_set_dtfile_exist_when_acquire_id, { + static size_t fail_point_called = 0; + if (existed_path.empty() && fail_point_called % 10 == 0) + { + existed_path = ""; + } + fail_point_called++; + }); + if (likely(existed_path.empty())) + { + break; + } + // else there is a DTFile with that id, continue to acquire a new ID. + LOG_WARNING(&Poco::Logger::get(who), + "The DTFile is already exists, continute to acquire another ID. [path=" + existed_path + + "] [id=" + DB::toString(dtfile_id) + "]"); + } while (true); + return dtfile_id; +} + bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_period) { { @@ -100,22 +138,22 @@ bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_perio last_try_gc_time = now; } - bool ok = false; + bool done_anything = false; // FIXME: The global_context.settings is mutable, we need a way to reload thses settings. // auto config = extractConfig(settings, StorageType::Meta); // meta_storage.reloadSettings(config); - ok |= meta_storage.gc(); + done_anything |= meta_storage.gc(); // config = extractConfig(settings, StorageType::Data); // data_storage.reloadSettings(config); - ok |= data_storage.gc(); + done_anything |= data_storage.gc(); // config = extractConfig(settings, StorageType::Log); // log_storage.reloadSettings(config); - ok |= log_storage.gc(); + done_anything |= log_storage.gc(); - return ok; + return done_anything; } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index 6b7d3a81de0..a992fcdc4c2 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -10,6 +10,7 @@ namespace DB struct Settings; class Context; class StoragePathPool; +class StableDiskDelegator; namespace DM { @@ -33,9 +34,10 @@ class StoragePool : private boost::noncopyable PageId maxMetaPageId() { return max_meta_page_id; } PageId newLogPageId() { return ++max_log_page_id; } - PageId newDataPageId() { return ++max_data_page_id; } PageId newMetaPageId() { return ++max_meta_page_id; } + PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who); + PageStorage & log() { return log_storage; } PageStorage & data() { return data_storage; } PageStorage & meta() { return meta_storage; } @@ -57,6 +59,8 @@ class StoragePool : private boost::noncopyable std::atomic last_try_gc_time = Clock::now(); std::mutex mutex; + + const Context & global_context; }; struct StorageSnapshot diff --git a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h index 7b8cffad201..822c6d981bb 100644 --- a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h +++ b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h @@ -27,8 +27,7 @@ inline ::testing::AssertionResult HandleRangeCompare(const char * lhs_exp { if (lhs == rhs) return ::testing::AssertionSuccess(); - else - return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false); + return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false); } #define ASSERT_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2) #define EXPECT_RANGE_EQ(val1, val2) EXPECT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2) diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index 3b991da0b6c..6a84ea40484 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -339,25 +339,29 @@ String StableDiskDelegator::choosePath() const return genericChoosePath(pool.main_path_infos, pool.global_capacity, path_generator, pool.log, log_msg); } -String StableDiskDelegator::getDTFilePath(UInt64 file_id) const +String StableDiskDelegator::getDTFilePath(UInt64 file_id, bool throw_on_not_exist) const { std::lock_guard lock{pool.mutex}; auto iter = pool.dt_file_path_map.find(file_id); if (likely(iter != pool.dt_file_path_map.end())) return pool.main_path_infos[iter->second].path + "/" + StoragePathPool::STABLE_FOLDER_NAME; - throw Exception("Can not find path for DMFile [id=" + toString(file_id) + "]"); + if (likely(throw_on_not_exist)) + throw Exception("Can not find path for DMFile [id=" + toString(file_id) + "]"); + return ""; } void StableDiskDelegator::addDTFile(UInt64 file_id, size_t file_size, std::string_view path) { path.remove_suffix(1 + strlen(StoragePathPool::STABLE_FOLDER_NAME)); // remove '/stable' added in listPathsForStable/getDTFilePath std::lock_guard lock{pool.mutex}; - if (auto iter = pool.dt_file_path_map.find(file_id); iter != pool.dt_file_path_map.end()) + if (auto iter = pool.dt_file_path_map.find(file_id); unlikely(iter != pool.dt_file_path_map.end())) { - auto & path_info = pool.main_path_infos[iter->second]; - pool.dt_file_path_map.erase(iter); - path_info.file_size_map.erase(file_id); + const auto & path_info = pool.main_path_infos[iter->second]; + throw DB::TiFlashException("Try to add a DTFile with duplicated id. [id=" + DB::toString(file_id) + "] [path=" + String(path) + + "] [existed_path=" + path_info.path + "]", + Errors::DeltaTree::Internal); } + UInt32 index = UINT32_MAX; for (size_t i = 0; i < pool.main_path_infos.size(); i++) { @@ -368,7 +372,9 @@ void StableDiskDelegator::addDTFile(UInt64 file_id, size_t file_size, std::strin } } if (unlikely(index == UINT32_MAX)) - throw Exception("Unrecognized path " + String(path)); + throw DB::TiFlashException( + "Try to add a DTFile to an unrecognized path. [id=" + DB::toString(file_id) + "] [path=" + String(path) + "]", + Errors::DeltaTree::Internal); pool.dt_file_path_map.emplace(file_id, index); pool.main_path_infos[index].file_size_map.emplace(file_id, file_size); // update global used size diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index 587a3b7dcc4..ab26ff48f45 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -96,7 +96,9 @@ class StableDiskDelegator : private boost::noncopyable String choosePath() const; - String getDTFilePath(UInt64 file_id) const; + // Get the path of the DTFile with file_id. + // If throw_on_not_exist is false, return empty string when the path is not exists. + String getDTFilePath(UInt64 file_id, bool throw_on_not_exist = true) const; void addDTFile(UInt64 file_id, size_t file_size, std::string_view path); diff --git a/dbms/src/TestUtils/TiFlashTestBasic.cpp b/dbms/src/TestUtils/TiFlashTestBasic.cpp new file mode 100644 index 00000000000..3879c05b08c --- /dev/null +++ b/dbms/src/TestUtils/TiFlashTestBasic.cpp @@ -0,0 +1,71 @@ +#include +#include +#include +#include + +namespace DB::tests +{ +std::unique_ptr TiFlashTestEnv::global_context = nullptr; + +void TiFlashTestEnv::initializeGlobalContext() +{ + // set itself as global context + global_context = std::make_unique(DB::Context::createGlobal()); + global_context->setGlobalContext(*global_context); + global_context->setApplicationType(DB::Context::ApplicationType::SERVER); + + global_context->initializeTiFlashMetrics(); + KeyManagerPtr key_manager = std::make_shared(false); + global_context->initializeFileProvider(key_manager, false); + + // Theses global variables should be initialized by the following order + // 1. capacity + // 2. path pool + // 3. TMTContext + + Strings testdata_path = {getTemporaryPath()}; + global_context->initializePathCapacityMetric(0, testdata_path, {}, {}, {}); + + auto paths = getPathPool(testdata_path); + global_context->setPathPool( + paths.first, paths.second, Strings{}, true, global_context->getPathCapacity(), global_context->getFileProvider()); + TiFlashRaftConfig raft_config; + + raft_config.ignore_databases = {"default", "system"}; + raft_config.engine = TiDB::StorageEngine::TMT; + raft_config.disable_bg_flush = false; + global_context->createTMTContext(raft_config, pingcap::ClusterConfig()); + + global_context->getTMTContext().restore(); +} + +Context TiFlashTestEnv::getContext(const DB::Settings & settings, Strings testdata_path) +{ + Context context = *global_context; + context.setGlobalContext(*global_context); + // Load `testdata_path` as path if it is set. + const String root_path = testdata_path.empty() ? getTemporaryPath() : testdata_path[0]; + if (testdata_path.empty()) + testdata_path.push_back(root_path); + context.setPath(root_path); + auto paths = getPathPool(testdata_path); + context.setPathPool(paths.first, paths.second, Strings{}, true, context.getPathCapacity(), context.getFileProvider()); + context.getSettingsRef() = settings; + return context; +} + +void TiFlashTestEnv::shutdown() +{ + global_context->getTMTContext().setTerminated(); + global_context->shutdown(); + global_context.reset(); +} + +::testing::AssertionResult DataTypeCompare(const char * lhs_expr, const char * rhs_expr, const DataTypePtr & lhs, const DataTypePtr & rhs) +{ + if (lhs->equals(*rhs)) + return ::testing::AssertionSuccess(); + return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs->getName(), rhs->getName(), false); +} + +} // namespace DB::tests diff --git a/dbms/src/TestUtils/TiFlashTestBasic.h b/dbms/src/TestUtils/TiFlashTestBasic.h index 0776b34c88f..1f2e16075dd 100644 --- a/dbms/src/TestUtils/TiFlashTestBasic.h +++ b/dbms/src/TestUtils/TiFlashTestBasic.h @@ -47,17 +47,11 @@ namespace tests } /// helper functions for comparing DataType -inline ::testing::AssertionResult DataTypeCompare( // +::testing::AssertionResult DataTypeCompare( // const char * lhs_expr, const char * rhs_expr, const DataTypePtr & lhs, - const DataTypePtr & rhs) -{ - if (lhs->equals(*rhs)) - return ::testing::AssertionSuccess(); - else - return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs->getName(), rhs->getName(), false); -} + const DataTypePtr & rhs); #define ASSERT_DATATYPE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::tests::DataTypeCompare, val1, val2) #define EXPECT_DATATYPE_EQ(val1, val2) EXPECT_PRED_FORMAT2(::DB::tests::DataTypeCompare, val1, val2) diff --git a/dbms/src/TestUtils/gtests_dbms_main.cpp b/dbms/src/TestUtils/gtests_dbms_main.cpp index 2e29d2c892f..f7ac629b7b1 100644 --- a/dbms/src/TestUtils/gtests_dbms_main.cpp +++ b/dbms/src/TestUtils/gtests_dbms_main.cpp @@ -1,76 +1,22 @@ #include -#include -#include -#include -#include #include -namespace DB::tests +namespace DB::FailPoints { -std::unique_ptr TiFlashTestEnv::global_context = nullptr; - -void TiFlashTestEnv::initializeGlobalContext() -{ - // set itself as global context - global_context = std::make_unique(DB::Context::createGlobal()); - global_context->setGlobalContext(*global_context); - global_context->setApplicationType(DB::Context::ApplicationType::SERVER); - - global_context->initializeTiFlashMetrics(); - KeyManagerPtr key_manager = std::make_shared(false); - global_context->initializeFileProvider(key_manager, false); - - // Theses global variables should be initialized by the following order - // 1. capacity - // 2. path pool - // 3. TMTContext - - Strings testdata_path = {getTemporaryPath()}; - global_context->initializePathCapacityMetric(0, testdata_path, {}, {}, {}); - - auto paths = getPathPool(testdata_path); - global_context->setPathPool( - paths.first, paths.second, Strings{}, true, global_context->getPathCapacity(), global_context->getFileProvider()); - TiFlashRaftConfig raft_config; - - raft_config.ignore_databases = {"default", "system"}; - raft_config.engine = TiDB::StorageEngine::TMT; - raft_config.disable_bg_flush = false; - global_context->createTMTContext(raft_config, pingcap::ClusterConfig()); - - global_context->getTMTContext().restore(); -} - -Context TiFlashTestEnv::getContext(const DB::Settings & settings, Strings testdata_path) -{ - Context context = *global_context; - context.setGlobalContext(*global_context); - // Load `testdata_path` as path if it is set. - const String root_path = testdata_path.empty() ? getTemporaryPath() : testdata_path[0]; - if (testdata_path.empty()) - testdata_path.push_back(root_path); - context.setPath(root_path); - auto paths = getPathPool(testdata_path); - context.setPathPool(paths.first, paths.second, Strings{}, true, context.getPathCapacity(), context.getFileProvider()); - context.getSettingsRef() = settings; - return context; -} - -void TiFlashTestEnv::shutdown() -{ - global_context->getTMTContext().setTerminated(); - global_context->shutdown(); - global_context.reset(); -} -} // namespace DB::tests +extern const char force_set_dtfile_exist_when_acquire_id[]; +} // namespace DB::FailPoints int main(int argc, char ** argv) { DB::tests::TiFlashTestEnv::setupLogger(); DB::tests::TiFlashTestEnv::initializeGlobalContext(); +#ifdef FIU_ENABLE fiu_init(0); // init failpoint + DB::FailPointHelper::enableFailPoint(DB::FailPoints::force_set_dtfile_exist_when_acquire_id); +#endif + ::testing::InitGoogleTest(&argc, argv); auto ret = RUN_ALL_TESTS();