diff --git a/dbms/src/Storages/Page/V3/BlobFile.cpp b/dbms/src/Storages/Page/V3/BlobFile.cpp index 5c50cf61864..ae5c61c2a0f 100644 --- a/dbms/src/Storages/Page/V3/BlobFile.cpp +++ b/dbms/src/Storages/Page/V3/BlobFile.cpp @@ -26,8 +26,12 @@ extern const char exception_before_page_file_write_sync[]; namespace PS::V3 { BlobFile::BlobFile(String path_, - FileProviderPtr file_provider_) - : file_provider{file_provider_} + BlobFileId blob_id_, + FileProviderPtr file_provider_, + PSDiskDelegatorPtr delegator_) + : blob_id(blob_id_) + , file_provider{std::move(file_provider_)} + , delegator(std::move(delegator_)) , path(path_) { // TODO: support encryption file @@ -36,6 +40,22 @@ BlobFile::BlobFile(String path_, getEncryptionPath(), false, /*create_new_encryption_info_*/ false); + + Poco::File file_in_disk(getPath()); + file_size = file_in_disk.getSize(); + { + std::lock_guard lock(file_size_lock); + + // If file_size is 0, we still need insert it. + PageFileIdAndLevel id_lvl{blob_id, 0}; + if (!delegator->fileExist(id_lvl)) + { + delegator->addPageFileUsedSize(id_lvl, + file_size, + path, + /*need_insert_location*/ true); + } + } } void BlobFile::read(char * buffer, size_t offset, size_t size, const ReadLimiterPtr & read_limiter) @@ -77,11 +97,37 @@ void BlobFile::write(char * buffer, size_t offset, size_t size, const WriteLimit PageUtil::writeFile(wrfile, offset, buffer, size, write_limiter, false); #endif PageUtil::syncFile(wrfile); + + UInt64 expand_size = 0; + { + std::lock_guard lock(file_size_lock); + if ((offset + size) > file_size) + { + expand_size = offset + size - file_size; + file_size = offset + size; + } + } + + if (expand_size != 0) + { + delegator->addPageFileUsedSize(std::make_pair(blob_id, 0), + expand_size, + path, + false); + } } void BlobFile::truncate(size_t size) { PageUtil::ftruncateFile(wrfile, size); + Int64 shrink_size = 0; + { + std::lock_guard lock(file_size_lock); + assert(size <= file_size); + shrink_size = file_size - size; + file_size = size; + } + delegator->freePageFileUsedSize(std::make_pair(blob_id, 0), shrink_size, path); } void BlobFile::remove() @@ -95,6 +141,8 @@ void BlobFile::remove() { file_provider->deleteRegularFile(getPath(), getEncryptionPath()); } + + delegator->removePageFile(std::make_pair(blob_id, 0), file_size, false, false); } BlobFile::~BlobFile() diff --git a/dbms/src/Storages/Page/V3/BlobFile.h b/dbms/src/Storages/Page/V3/BlobFile.h index 146b2b5d2b8..61c10806ce5 100644 --- a/dbms/src/Storages/Page/V3/BlobFile.h +++ b/dbms/src/Storages/Page/V3/BlobFile.h @@ -22,6 +22,7 @@ #include #include #include +#include namespace DB::PS::V3 { @@ -29,7 +30,9 @@ class BlobFile { public: BlobFile(String path_, - FileProviderPtr file_provider_); + BlobFileId blob_id_, + FileProviderPtr file_provider_, + PSDiskDelegatorPtr delegator_); ~BlobFile(); @@ -54,10 +57,16 @@ class BlobFile void remove(); private: + const BlobFileId blob_id; + FileProviderPtr file_provider; + PSDiskDelegatorPtr delegator; String path; WriteReadableFilePtr wrfile; + + std::mutex file_size_lock; + BlobFileOffset file_size; }; using BlobFilePtr = std::shared_ptr; diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 5901312445c..9d1455a1e11 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include #include @@ -23,8 +25,12 @@ #include #include +#include +#include #include +#include #include + namespace ProfileEvents { extern const Event PSMWritePages; @@ -51,12 +57,12 @@ using ChecksumClass = Digest::CRC64; * BlobStore methods * *********************/ -BlobStore::BlobStore(const FileProviderPtr & file_provider_, String path_, BlobStore::Config config_) - : file_provider(file_provider_) - , path(path_) +BlobStore::BlobStore(const FileProviderPtr & file_provider_, PSDiskDelegatorPtr delegator_, BlobStore::Config config_) + : delegator(std::move(delegator_)) + , file_provider(file_provider_) , config(config_) , log(Logger::get("BlobStore")) - , blob_stats(log, config_) + , blob_stats(log, delegator, config_) , cached_files(config.cached_fd_size) { } @@ -559,60 +565,64 @@ std::vector BlobStore::getGCStats() const auto stats_list = blob_stats.getStats(); std::vector blob_need_gc; - for (const auto & stat : stats_list) + for (const auto & [path, stats] : stats_list) { - if (stat->isReadOnly()) + (void)path; + for (const auto & stat : stats) { - LOG_FMT_TRACE(log, "Current [blob_id={}] is read-only", stat->id); - continue; - } + if (stat->isReadOnly()) + { + LOG_FMT_TRACE(log, "Current [blob_id={}] is read-only", stat->id); + continue; + } - auto lock = stat->lock(); - auto right_margin = stat->smap->getRightMargin(); + auto lock = stat->lock(); + auto right_margin = stat->smap->getRightMargin(); - // Avoid divide by zero - if (right_margin == 0) - { - LOG_FMT_TRACE(log, "Current blob is empty [blob_id={}, total size(all invalid)={}].", stat->id, stat->sm_total_size); - continue; - } + // Avoid divide by zero + if (right_margin == 0) + { + LOG_FMT_TRACE(log, "Current blob is empty [blob_id={}, total size(all invalid)={}].", stat->id, stat->sm_total_size); + continue; + } - stat->sm_valid_rate = stat->sm_valid_size * 1.0 / right_margin; + stat->sm_valid_rate = stat->sm_valid_size * 1.0 / right_margin; - if (stat->sm_valid_rate > 1.0) - { - LOG_FMT_ERROR( - log, - "Current blob got an invalid rate {:.2f}, total size is {}, valid size is {}, right margin is {} [blob_id={}]", - stat->sm_valid_rate, - stat->sm_total_size, - stat->sm_valid_size, - right_margin, - stat->id); - assert(false); - continue; - } + if (stat->sm_valid_rate > 1.0) + { + LOG_FMT_ERROR( + log, + "Current blob got an invalid rate {:.2f}, total size is {}, valid size is {}, right margin is {} [blob_id={}]", + stat->sm_valid_rate, + stat->sm_total_size, + stat->sm_valid_size, + right_margin, + stat->id); + assert(false); + continue; + } - // Check if GC is required - if (stat->sm_valid_rate <= config.heavy_gc_valid_rate) - { - LOG_FMT_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, Need do compact GC", stat->id, stat->sm_valid_rate); - blob_need_gc.emplace_back(stat->id); + // Check if GC is required + if (stat->sm_valid_rate <= config.heavy_gc_valid_rate) + { + LOG_FMT_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, Need do compact GC", stat->id, stat->sm_valid_rate); + blob_need_gc.emplace_back(stat->id); - // Change current stat to read only - stat->changeToReadOnly(); - } - else - { - LOG_FMT_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, No need to GC.", stat->id, stat->sm_valid_rate); - } + // Change current stat to read only + stat->changeToReadOnly(); + } + else + { + LOG_FMT_TRACE(log, "Current [blob_id={}] valid rate is {:.2f}, No need to GC.", stat->id, stat->sm_valid_rate); + } - if (right_margin != stat->sm_total_size) - { - auto blobfile = getBlobFile(stat->id); - LOG_FMT_TRACE(log, "Truncate blob file [blob_id={}] [origin size={}] [truncated size={}]", stat->id, stat->sm_total_size, right_margin); - blobfile->truncate(right_margin); - stat->sm_total_size = right_margin; + if (right_margin != stat->sm_total_size) + { + auto blobfile = getBlobFile(stat->id); + LOG_FMT_TRACE(log, "Truncate blob file [blob_id={}] [origin size={}] [truncated size={}]", stat->id, stat->sm_total_size, right_margin); + blobfile->truncate(right_margin); + stat->sm_total_size = right_margin; + } } } @@ -752,15 +762,18 @@ PageEntriesEdit BlobStore::gc(std::map & } -String BlobStore::getBlobFilePath(BlobFileId blob_id) const +String BlobStore::getBlobFilePath(BlobFileId blob_id) { - return path + "/blobfile_" + DB::toString(blob_id); + PageFileIdAndLevel id_lvl{blob_id, 0}; + String parent_path = delegator->choosePath(id_lvl); + + return parent_path + "/blobfile_" + DB::toString(blob_id); } BlobFilePtr BlobStore::getBlobFile(BlobFileId blob_id) { return cached_files.getOrSet(blob_id, [this, blob_id]() -> BlobFilePtr { - return std::make_shared(getBlobFilePath(blob_id), file_provider); + return std::make_shared(getBlobFilePath(blob_id), blob_id, file_provider, delegator); }) .first; } @@ -769,9 +782,10 @@ BlobFilePtr BlobStore::getBlobFile(BlobFileId blob_id) * BlobStats methods * *********************/ -BlobStore::BlobStats::BlobStats(LoggerPtr log_, BlobStore::Config config_) +BlobStore::BlobStats::BlobStats(LoggerPtr log_, PSDiskDelegatorPtr delegator_, BlobStore::Config config_) : log(std::move(log_)) , config(config_) + , delegator(delegator_) { } @@ -784,10 +798,15 @@ void BlobStore::BlobStats::restoreByEntry(const PageEntryV3 & entry) void BlobStore::BlobStats::restore() { BlobFileId max_restored_file_id = 0; - for (const auto & stat : stats_map) + + for (auto & [path, stats] : stats_map) { - stat->recalculateSpaceMap(); - max_restored_file_id = std::max(stat->id, max_restored_file_id); + (void)path; + for (const auto & stat : stats) + { + stat->recalculateSpaceMap(); + max_restored_file_id = std::max(stat->id, max_restored_file_id); + } } // restore `roll_id` @@ -810,7 +829,22 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std: ErrorCodes::LOGICAL_ERROR); } - auto stat = createStatNotCheckingRoll(blob_file_id, guard); + for (auto & [path, stats] : stats_map) + { + (void)path; + for (const auto & stat : stats) + { + if (stat->id == blob_file_id) + { + throw Exception(fmt::format("BlobStats can not create [blob_id={}] which is exist", + blob_file_id), + ErrorCodes::LOGICAL_ERROR); + } + } + } + + // Create a stat without checking the file_id exist or not + auto stat = createStatNotChecking(blob_file_id, guard); // Roll to the next new blob id if (blob_file_id == roll_id) @@ -821,43 +855,39 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std: return stat; } -BlobStatPtr BlobStore::BlobStats::createStatNotCheckingRoll(BlobFileId blob_file_id, const std::lock_guard &) +BlobStatPtr BlobStore::BlobStats::createStatNotChecking(BlobFileId blob_file_id, const std::lock_guard &) { - for (auto & stat : stats_map) - { - if (stat->id == blob_file_id) - { - throw Exception(fmt::format("BlobStats won't create [blob_id={}] which is exist", - blob_file_id), - ErrorCodes::LOGICAL_ERROR); - } - } - LOG_FMT_DEBUG(log, "Created a new BlobStat [blob_id={}]", blob_file_id); BlobStatPtr stat = std::make_shared( blob_file_id, - SpaceMap::createSpaceMap(static_cast(config.spacemap_type.get()), 0, config.file_limit_size)); - stat->sm_max_caps = config.file_limit_size; - stats_map.emplace_back(stat); + static_cast(config.spacemap_type.get()), + config.file_limit_size); + PageFileIdAndLevel id_lvl{blob_file_id, 0}; + stats_map[delegator->choosePath(id_lvl)].emplace_back(stat); return stat; } void BlobStore::BlobStats::eraseStat(const BlobStatPtr && stat, const std::lock_guard &) { - stats_map.remove(stat); + PageFileIdAndLevel id_lvl{stat->id, 0}; + stats_map[delegator->getPageFilePath(id_lvl)].remove(stat); } void BlobStore::BlobStats::eraseStat(BlobFileId blob_file_id, const std::lock_guard & lock) { BlobStatPtr stat = nullptr; - for (auto & stat_in_map : stats_map) + for (auto & [path, stats] : stats_map) { - if (stat_in_map->id == blob_file_id) + (void)path; + for (const auto & stat_in_map : stats) { - stat = stat_in_map; - break; + if (stat_in_map->id == blob_file_id) + { + stat = stat_in_map; + break; + } } } @@ -883,18 +913,47 @@ std::pair BlobStore::BlobStats::chooseStat(size_t buf_s return std::make_pair(nullptr, roll_id); } - for (const auto & stat : stats_map) + // If the stats_map size changes, or stats_map_path_index is out of range, + // then make stats_map_path_index fit to current size. + stats_map_path_index %= stats_map.size(); + + auto stats_iter = stats_map.begin(); + std::advance(stats_iter, stats_map_path_index); + + size_t path_iter_idx = 0; + for (path_iter_idx = 0; path_iter_idx < stats_map.size(); ++path_iter_idx) { - if (!stat->isReadOnly() - && stat->sm_max_caps >= buf_size - && stat->sm_valid_rate < smallest_valid_rate) + // Try to find a suitable stat under current path (path=`stats_iter->first`) + for (const auto & stat : stats_iter->second) + { + if (!stat->isReadOnly() + && stat->sm_max_caps >= buf_size + && stat->sm_valid_rate < smallest_valid_rate) + { + smallest_valid_rate = stat->sm_valid_rate; + stat_ptr = stat; + } + } + + // Already find the available stat under current path. + if (stat_ptr != nullptr) + { + break; + } + + // Try to find stat in the next path. + stats_iter++; + if (stats_iter == stats_map.end()) { - smallest_valid_rate = stat->sm_valid_rate; - stat_ptr = stat; + stats_iter = stats_map.begin(); } } - if (!stat_ptr) + // advance the `stats_map_path_idx` without size checking + stats_map_path_index += path_iter_idx + 1; + + // Can not find a suitable stat under all paths + if (stat_ptr == nullptr) { return std::make_pair(nullptr, roll_id); } @@ -905,18 +964,22 @@ std::pair BlobStore::BlobStats::chooseStat(size_t buf_s BlobStatPtr BlobStore::BlobStats::blobIdToStat(BlobFileId file_id, bool restore_if_not_exist, bool ignore_not_exist) { auto guard = lock(); - for (auto & stat : stats_map) + for (const auto & [path, stats] : stats_map) { - if (stat->id == file_id) + (void)path; + for (const auto & stat : stats) { - return stat; + if (stat->id == file_id) + { + return stat; + } } } if (restore_if_not_exist) { - // Restore a stat without checking the roll_id - return createStatNotCheckingRoll(file_id, guard); + // Restore a stat without checking file_id exist or not and won't push forward the roll_id + return createStatNotChecking(file_id, guard); } if (!ignore_not_exist) diff --git a/dbms/src/Storages/Page/V3/BlobStore.h b/dbms/src/Storages/Page/V3/BlobStore.h index 5756bebbf04..b73f78b29d1 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.h +++ b/dbms/src/Storages/Page/V3/BlobStore.h @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -88,10 +89,12 @@ class BlobStore : private Allocator std::mutex sm_lock; - BlobStat(BlobFileId id_, SpaceMapPtr && smap_) - : smap(std::move(smap_)) + public: + BlobStat(BlobFileId id_, SpaceMap::SpaceMapType sm_type, UInt64 sm_max_caps_) + : smap(SpaceMap::createSpaceMap(sm_type, 0, sm_max_caps_)) , id(id_) , type(BlobStatType::NORMAL) + , sm_max_caps(sm_max_caps_) {} [[nodiscard]] std::lock_guard lock() @@ -135,11 +138,11 @@ class BlobStore : private Allocator using BlobStatPtr = std::shared_ptr; public: - BlobStats(LoggerPtr log_, BlobStore::Config config); + BlobStats(LoggerPtr log_, PSDiskDelegatorPtr delegator_, BlobStore::Config config); [[nodiscard]] std::lock_guard lock() const; - BlobStatPtr createStatNotCheckingRoll(BlobFileId blob_file_id, const std::lock_guard &); + BlobStatPtr createStatNotChecking(BlobFileId blob_file_id, const std::lock_guard &); BlobStatPtr createStat(BlobFileId blob_file_id, const std::lock_guard &); @@ -165,12 +168,13 @@ class BlobStore : private Allocator BlobStatPtr blobIdToStat(BlobFileId file_id, bool restore_if_not_exist = false, bool ignore_not_exist = false); - std::list getStats() const + std::map> getStats() const { auto guard = lock(); return stats_map; } + #ifndef DBMS_PUBLIC_GTEST private: #endif @@ -185,11 +189,15 @@ class BlobStore : private Allocator BlobStore::Config config; BlobFileId roll_id = 1; - std::list stats_map; + std::map> stats_map; + // Index for selecting next path for creating new blobfile + UInt16 stats_map_path_index = 0; + + PSDiskDelegatorPtr delegator; mutable std::mutex lock_stats; }; - BlobStore(const FileProviderPtr & file_provider_, String path, BlobStore::Config config); + BlobStore(const FileProviderPtr & file_provider_, PSDiskDelegatorPtr delegator_, BlobStore::Config config); std::vector getGCStats(); @@ -242,7 +250,7 @@ class BlobStore : private Allocator */ void removePosFromStats(BlobFileId blob_id, BlobFileOffset offset, size_t size); - String getBlobFilePath(BlobFileId blob_id) const; + String getBlobFilePath(BlobFileId blob_id); BlobFilePtr getBlobFile(BlobFileId blob_id); @@ -250,9 +258,11 @@ class BlobStore : private Allocator #ifndef DBMS_PUBLIC_GTEST private: #endif + constexpr static const char * blob_prefix_name = "/blobfile_"; + + PSDiskDelegatorPtr delegator; FileProviderPtr file_provider; - String path{}; Config config; LoggerPtr log; diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index daf2571d8aa..53aeada573b 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -35,7 +35,7 @@ PageStorageImpl::PageStorageImpl( const FileProviderPtr & file_provider_) : DB::PageStorage(name, delegator_, config_, file_provider_) , log(Logger::get("PageStorage", name)) - , blob_store(file_provider_, delegator->defaultPath(), blob_config) + , blob_store(file_provider_, delegator, blob_config) { } diff --git a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp index a91181f812c..d1cf9e72b89 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include namespace DB::PS::V3::tests @@ -35,16 +36,22 @@ class BlobStoreStatsTest : public DB::base::TiFlashStorageTestBasic public: BlobStoreStatsTest() : logger(Logger::get("BlobStoreStatsTest")) - {} + { + auto path = getTemporaryPath(); + DB::tests::TiFlashTestEnv::tryRemovePath(path); + createIfNotExist(path); + delegator = std::make_shared(path); + } protected: BlobStore::Config config; LoggerPtr logger; + PSDiskDelegatorPtr delegator; }; TEST_F(BlobStoreStatsTest, RestoreEmpty) { - BlobStats stats(logger, config); + BlobStats stats(logger, delegator, config); stats.restore(); @@ -58,7 +65,7 @@ TEST_F(BlobStoreStatsTest, RestoreEmpty) TEST_F(BlobStoreStatsTest, Restore) try { - BlobStats stats(logger, config); + BlobStats stats(logger, delegator, config); BlobFileId file_id1 = 10; BlobFileId file_id2 = 12; @@ -89,7 +96,9 @@ try } auto stats_copy = stats.getStats(); - ASSERT_EQ(stats_copy.size(), 2); + + ASSERT_EQ(stats_copy.size(), 1); + ASSERT_EQ(stats_copy.begin()->second.size(), 2); EXPECT_EQ(stats.roll_id, 13); auto stat1 = stats.blobIdToStat(file_id1); @@ -111,7 +120,7 @@ CATCH TEST_F(BlobStoreStatsTest, testStats) { - BlobStats stats(logger, config); + BlobStats stats(logger, delegator, config); auto stat = stats.createStat(0, stats.lock()); @@ -120,7 +129,10 @@ TEST_F(BlobStoreStatsTest, testStats) stats.createStat(1, stats.lock()); stats.createStat(2, stats.lock()); - ASSERT_EQ(stats.stats_map.size(), 3); + auto stats_copy = stats.getStats(); + + ASSERT_EQ(stats_copy.size(), 1); + ASSERT_EQ(stats_copy.begin()->second.size(), 3); ASSERT_EQ(stats.roll_id, 3); stats.eraseStat(0, stats.lock()); @@ -135,7 +147,7 @@ TEST_F(BlobStoreStatsTest, testStat) BlobFileId blob_file_id = 0; BlobStore::BlobStats::BlobStatPtr stat; - BlobStats stats(logger, config); + BlobStats stats(logger, delegator, config); std::tie(stat, blob_file_id) = stats.chooseStat(10, stats.lock()); ASSERT_EQ(blob_file_id, 1); @@ -208,7 +220,7 @@ TEST_F(BlobStoreStatsTest, testFullStats) BlobStore::BlobStats::BlobStatPtr stat; BlobFileOffset offset = 0; - BlobStats stats(logger, config); + BlobStats stats(logger, delegator, config); stat = stats.createStat(1, stats.lock()); offset = stat->getPosFromStat(BLOBFILE_LIMIT_SIZE - 1); @@ -252,19 +264,15 @@ class BlobStoreTest : public DB::base::TiFlashStorageTestBasic public: void SetUp() override { - path = getTemporaryPath(); + auto path = getTemporaryPath(); DB::tests::TiFlashTestEnv::tryRemovePath(path); - - Poco::File file(path); - if (!file.exists()) - { - file.createDirectories(); - } + createIfNotExist(path); + delegator = std::make_shared(path); } protected: BlobStore::Config config; - String path{}; + PSDiskDelegatorPtr delegator; }; TEST_F(BlobStoreTest, Restore) @@ -272,7 +280,7 @@ try { const auto file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider(); config.file_limit_size = 2560; - auto blob_store = BlobStore(file_provider, path, config); + auto blob_store = BlobStore(file_provider, delegator, config); BlobFileId file_id1 = 10; BlobFileId file_id2 = 12; @@ -304,19 +312,22 @@ try // check spacemap updated { - for (const auto & stat : blob_store.blob_stats.getStats()) + for (const auto & [path, stats] : blob_store.blob_stats.getStats()) { - if (stat->id == file_id1) - { - ASSERT_EQ(stat->sm_total_size, 2560); - ASSERT_EQ(stat->sm_valid_size, 640); - ASSERT_EQ(stat->sm_max_caps, 1024); - } - else if (stat->id == file_id2) + for (const auto & stat : stats) { - ASSERT_EQ(stat->sm_total_size, 2560); - ASSERT_EQ(stat->sm_valid_size, 512); - ASSERT_EQ(stat->sm_max_caps, 2048); + if (stat->id == file_id1) + { + ASSERT_EQ(stat->sm_total_size, 2560); + ASSERT_EQ(stat->sm_valid_size, 640); + ASSERT_EQ(stat->sm_max_caps, 1024); + } + else if (stat->id == file_id2) + { + ASSERT_EQ(stat->sm_total_size, 2560); + ASSERT_EQ(stat->sm_valid_size, 512); + ASSERT_EQ(stat->sm_max_caps, 2048); + } } } } @@ -331,7 +342,7 @@ TEST_F(BlobStoreTest, testWriteRead) size_t buff_nums = 21; size_t buff_size = 123; - auto blob_store = BlobStore(file_provider, path, config); + auto blob_store = BlobStore(file_provider, delegator, config); char c_buff[buff_size * buff_nums]; WriteBatch wb; @@ -423,7 +434,7 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead) off += data_sz; } - auto blob_store = BlobStore(file_provider, path, config); + auto blob_store = BlobStore(file_provider, delegator, config); char c_buff[buff_size * buff_nums]; WriteBatch wb; @@ -479,7 +490,7 @@ TEST_F(BlobStoreTest, testWrite) try { const auto file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider(); - auto blob_store = BlobStore(file_provider, path, config); + auto blob_store = BlobStore(file_provider, delegator, config); PageId page_id = 50; const size_t buff_size = 1024; @@ -589,7 +600,7 @@ TEST_F(BlobStoreTest, testWriteOutOfLimitSize) { config.file_limit_size = buff_size - 1; - auto blob_store = BlobStore(file_provider, path, config); + auto blob_store = BlobStore(file_provider, delegator, config); WriteBatch wb; char c_buff[buff_size]; @@ -613,7 +624,7 @@ TEST_F(BlobStoreTest, testWriteOutOfLimitSize) size_t buffer_sizes[] = {buff_size, buff_size - 1, buff_size / 2 + 1}; for (auto & buf_size : buffer_sizes) { - auto blob_store = BlobStore(file_provider, path, config); + auto blob_store = BlobStore(file_provider, delegator, config); WriteBatch wb; char c_buff1[buf_size]; @@ -656,7 +667,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats) size_t buff_size = 1024; size_t buff_nums = 10; PageId page_id = 50; - auto blob_store = BlobStore(file_provider, path, config); + auto blob_store = BlobStore(file_provider, delegator, config); std::list remove_entries_idx1 = {1, 3, 4, 7, 9}; std::list remove_entries_idx2 = {6, 8}; @@ -740,7 +751,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats2) size_t buff_size = 1024; size_t buff_nums = 10; PageId page_id = 50; - auto blob_store = BlobStore(file_provider, path, config); + auto blob_store = BlobStore(file_provider, delegator, config); std::list remove_entries_idx = {0, 1, 2, 3, 4, 5, 6, 7}; WriteBatch wb; @@ -800,7 +811,7 @@ TEST_F(BlobStoreTest, GC) size_t buff_nums = 21; size_t buff_size = 123; - auto blob_store = BlobStore(file_provider, path, config); + auto blob_store = BlobStore(file_provider, delegator, config); char c_buff[buff_size * buff_nums]; WriteBatch wb; @@ -867,7 +878,7 @@ try BlobStore::Config config_with_small_file_limit_size; config_with_small_file_limit_size.file_limit_size = 100; - auto blob_store = BlobStore(file_provider, path, config_with_small_file_limit_size); + auto blob_store = BlobStore(file_provider, delegator, config_with_small_file_limit_size); char c_buff[buff_size * buff_nums]; WriteBatch wb; @@ -918,7 +929,7 @@ try BlobStore::Config config_with_small_file_limit_size; config_with_small_file_limit_size.file_limit_size = 100; - auto blob_store = BlobStore(file_provider, path, config_with_small_file_limit_size); + auto blob_store = BlobStore(file_provider, delegator, config_with_small_file_limit_size); char c_buff[buff_size * buff_nums]; WriteBatch wb; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 32dd4004883..fae0d5e6bbd 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -1988,7 +1988,9 @@ try { auto snap = dir->createSnapshot(); auto edit = dir->dumpSnapshotToEdit(snap); - BlobStore::BlobStats stats(log, BlobStore::Config{}); + auto path = getTemporaryPath(); + PSDiskDelegatorPtr delegator = std::make_shared(path); + BlobStore::BlobStats stats(log, delegator, BlobStore::Config{}); auto restored_dir = restore_from_edit(edit, stats); auto temp_snap = restored_dir->createSnapshot(); EXPECT_SAME_ENTRY(entry_1_v1, restored_dir->get(2, temp_snap).second); diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index 2dda5ed30f7..d7f8280dd9c 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -314,7 +314,12 @@ void StoragePathPool::renamePath(const String & old_path, const String & new_pat //========================================================================================== template -String genericChoosePath(const std::vector & paths, const PathCapacityMetricsPtr & global_capacity, std::function & paths, size_t idx)> path_generator, Poco::Logger * log, const String & log_msg) +String genericChoosePath(const std::vector & paths, // + const PathCapacityMetricsPtr & global_capacity, // + std::function & paths, size_t idx)> path_generator, // + std::function path_getter, // + Poco::Logger * log, // + const String & log_msg) { if (paths.size() == 1) return path_generator(paths, 0); @@ -323,7 +328,7 @@ String genericChoosePath(const std::vector & paths, const PathCapacityMetrics std::vector stats; for (size_t i = 0; i < paths.size(); ++i) { - stats.emplace_back(std::get<0>(global_capacity->getFsStatsOfPath(paths[i].path))); + stats.emplace_back(std::get<0>(global_capacity->getFsStatsOfPath(path_getter(paths[i])))); total_available_size += stats.back().avail_size; } @@ -379,8 +384,13 @@ String StableDiskDelegator::choosePath() const = [](const StoragePathPool::MainPathInfos & paths, size_t idx) -> String { return fmt::format("{}/{}", paths[idx].path, StoragePathPool::STABLE_FOLDER_NAME); }; + + std::function path_getter = [](const StoragePathPool::MainPathInfo & info) -> String { + return info.path; + }; + const String log_msg = fmt::format("[type=stable] [database={}] [table={}]", pool.database, pool.table); - return genericChoosePath(pool.main_path_infos, pool.global_capacity, path_generator, pool.log, log_msg); + return genericChoosePath(pool.main_path_infos, pool.global_capacity, path_generator, path_getter, pool.log, log_msg); } String StableDiskDelegator::getDTFilePath(UInt64 file_id, bool throw_on_not_exist) const @@ -496,6 +506,12 @@ void StableDiskDelegator::removeDTFile(UInt64 file_id) // Delta data //========================================================================================== + +bool PSDiskDelegatorMulti::fileExist(const PageFileIdAndLevel & /*id_lvl*/) const +{ + throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); +} + size_t PSDiskDelegatorMulti::numPaths() const { return pool.latest_path_infos.size(); @@ -524,6 +540,10 @@ String PSDiskDelegatorMulti::choosePath(const PageFileIdAndLevel & id_lvl) return fmt::format("{}/{}", paths[idx].path, this->path_prefix); }; + std::function path_getter = [](const StoragePathPool::LatestPathInfo & info) -> String { + return info.path; + }; + { std::lock_guard lock{pool.mutex}; /// If id exists in page_path_map, just return the same path @@ -532,7 +552,7 @@ String PSDiskDelegatorMulti::choosePath(const PageFileIdAndLevel & id_lvl) } const String log_msg = fmt::format("[type=ps_multi] [database={}] [table={}]", pool.database, pool.table); - return genericChoosePath(pool.latest_path_infos, pool.global_capacity, path_generator, pool.log, log_msg); + return genericChoosePath(pool.latest_path_infos, pool.global_capacity, path_generator, path_getter, pool.log, log_msg); } size_t PSDiskDelegatorMulti::addPageFileUsedSize( @@ -566,6 +586,14 @@ size_t PSDiskDelegatorMulti::addPageFileUsedSize( return index; } +size_t PSDiskDelegatorMulti::freePageFileUsedSize( + const PageFileIdAndLevel & /*id_lvl*/, + size_t /*size_to_free*/, + const String & /*pf_parent_path*/) +{ + throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); +} + String PSDiskDelegatorMulti::getPageFilePath(const PageFileIdAndLevel & id_lvl) const { std::lock_guard lock{pool.mutex}; @@ -599,6 +627,11 @@ void PSDiskDelegatorMulti::removePageFile(const PageFileIdAndLevel & id_lvl, siz // Normal data //========================================================================================== +bool PSDiskDelegatorSingle::fileExist(const PageFileIdAndLevel & /*id_lvl*/) const +{ + throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); +} + size_t PSDiskDelegatorSingle::numPaths() const { return 1; @@ -634,6 +667,15 @@ size_t PSDiskDelegatorSingle::addPageFileUsedSize( return 0; } + +size_t PSDiskDelegatorSingle::freePageFileUsedSize( + const PageFileIdAndLevel & /*id_lvl*/, + size_t /*size_to_free*/, + const String & /*pf_parent_path*/) +{ + throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); +} + String PSDiskDelegatorSingle::getPageFilePath(const PageFileIdAndLevel & /*id_lvl*/) const { return fmt::format("{}/{}", pool.latest_path_infos[0].path, path_prefix); @@ -645,7 +687,7 @@ void PSDiskDelegatorSingle::removePageFile(const PageFileIdAndLevel & /*id_lvl*/ } //========================================================================================== -// Raft data +// Raft Region data //========================================================================================== PSDiskDelegatorRaft::PSDiskDelegatorRaft(PathPool & pool_) : pool(pool_) @@ -659,6 +701,11 @@ PSDiskDelegatorRaft::PSDiskDelegatorRaft(PathPool & pool_) } } +bool PSDiskDelegatorRaft::fileExist(const PageFileIdAndLevel & id_lvl) const +{ + return page_path_map.find(id_lvl) != page_path_map.end(); +} + size_t PSDiskDelegatorRaft::numPaths() const { return raft_path_infos.size(); @@ -681,6 +728,10 @@ String PSDiskDelegatorRaft::choosePath(const PageFileIdAndLevel & id_lvl) return paths[idx].path; }; + std::function path_getter = [](const RaftPathInfo & info) -> String { + return info.path; + }; + { std::lock_guard lock{mutex}; /// If id exists in page_path_map, just return the same path @@ -690,7 +741,7 @@ String PSDiskDelegatorRaft::choosePath(const PageFileIdAndLevel & id_lvl) // Else choose path randomly const String log_msg = "[type=ps_raft]"; - return genericChoosePath(raft_path_infos, pool.global_capacity, path_generator, pool.log, log_msg); + return genericChoosePath(raft_path_infos, pool.global_capacity, path_generator, path_getter, pool.log, log_msg); } size_t PSDiskDelegatorRaft::addPageFileUsedSize( @@ -724,6 +775,37 @@ size_t PSDiskDelegatorRaft::addPageFileUsedSize( return index; } +size_t PSDiskDelegatorRaft::freePageFileUsedSize( + const PageFileIdAndLevel & id_lvl, + size_t size_to_free, + const String & pf_parent_path) +{ + String upper_path = getNormalizedPath(pf_parent_path); + UInt32 index = UINT32_MAX; + for (size_t i = 0; i < raft_path_infos.size(); i++) + { + if (raft_path_infos[i].path == upper_path) + { + index = i; + break; + } + } + + if (unlikely(index == UINT32_MAX)) + { + throw Exception(fmt::format("Unrecognized path {}", upper_path)); + } + + if (page_path_map.find(id_lvl) == page_path_map.end()) + { + throw Exception(fmt::format("Can not find path for PageFile [id={}_{}, path={}]", id_lvl.first, id_lvl.second, pf_parent_path)); + } + + // update global used size + pool.global_capacity->freeUsedSize(upper_path, size_to_free); + return index; +} + String PSDiskDelegatorRaft::getPageFilePath(const PageFileIdAndLevel & id_lvl) const { std::lock_guard lock{mutex}; @@ -756,6 +838,11 @@ void PSDiskDelegatorRaft::removePageFile(const PageFileIdAndLevel & id_lvl, size // Global page data //========================================================================================== +bool PSDiskDelegatorGlobalMulti::fileExist(const PageFileIdAndLevel & id_lvl) const +{ + return page_path_map.find(id_lvl) != page_path_map.end(); +} + size_t PSDiskDelegatorGlobalMulti::numPaths() const { return pool.listGlobalPagePaths().size(); @@ -777,30 +864,129 @@ Strings PSDiskDelegatorGlobalMulti::listPaths() const return paths; } -String PSDiskDelegatorGlobalMulti::choosePath(const PageFileIdAndLevel & /*id_lvl*/) +String PSDiskDelegatorGlobalMulti::choosePath(const PageFileIdAndLevel & id_lvl) { - throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); + std::function path_generator = + [this](const Strings & paths, size_t idx) -> String { + return fmt::format("{}/{}", paths[idx], this->path_prefix); + }; + + std::function path_getter = [](const String & path_) -> String { + return path_; + }; + + { + std::lock_guard lock{mutex}; + /// If id exists in page_path_map, just return the same path + if (auto iter = page_path_map.find(id_lvl); iter != page_path_map.end()) + return path_generator(pool.listGlobalPagePaths(), iter->second); + } + + const String log_msg = "[type=global_ps_multi]"; + return genericChoosePath(pool.listGlobalPagePaths(), pool.global_capacity, path_generator, path_getter, pool.log, log_msg); } size_t PSDiskDelegatorGlobalMulti::addPageFileUsedSize( - const PageFileIdAndLevel & /*id_lvl*/, - size_t /*size_to_add*/, - const String & /*pf_parent_path*/, - bool /*need_insert_location*/) + const PageFileIdAndLevel & id_lvl, + size_t size_to_add, + const String & pf_parent_path, + bool need_insert_location) { - throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); + // Get a normalized path without `path_prefix` and trailing '/' + String upper_path = removeTrailingSlash(Poco::Path(pf_parent_path).parent().toString()); + UInt32 index = UINT32_MAX; + + const auto & global_paths = pool.listGlobalPagePaths(); + for (size_t i = 0; i < global_paths.size(); i++) + { + if (global_paths[i] == upper_path) + { + index = i; + break; + } + } + + if (unlikely(index == UINT32_MAX)) + throw Exception(fmt::format("Unrecognized path {}", upper_path)); + + { + std::lock_guard lock{mutex}; + if (need_insert_location) + page_path_map[id_lvl] = index; + } + + // update global used size + pool.global_capacity->addUsedSize(upper_path, size_to_add); + return index; } -String PSDiskDelegatorGlobalMulti::getPageFilePath(const PageFileIdAndLevel & /*id_lvl*/) const +size_t PSDiskDelegatorGlobalMulti::freePageFileUsedSize( + const PageFileIdAndLevel & id_lvl, + size_t size_to_free, + const String & pf_parent_path) { - throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); + // Get a normalized path without `path_prefix` and trailing '/' + String upper_path = removeTrailingSlash(Poco::Path(pf_parent_path).parent().toString()); + UInt32 index = UINT32_MAX; + + const auto & global_paths = pool.listGlobalPagePaths(); + for (size_t i = 0; i < global_paths.size(); i++) + { + if (global_paths[i] == upper_path) + { + index = i; + break; + } + } + + if (unlikely(index == UINT32_MAX)) + { + throw Exception(fmt::format("Unrecognized path {}", upper_path)); + } + + if (page_path_map.find(id_lvl) == page_path_map.end()) + { + throw Exception(fmt::format("Can not find path for PageFile [id={}_{}, path={}]", id_lvl.first, id_lvl.second, pf_parent_path)); + } + + // update global used size + pool.global_capacity->freeUsedSize(upper_path, size_to_free); + return index; } -void PSDiskDelegatorGlobalMulti::removePageFile(const PageFileIdAndLevel & /*id_lvl*/, size_t /*file_size*/, bool /*meta_left*/, bool /*remove_from_default_path*/) +String PSDiskDelegatorGlobalMulti::getPageFilePath(const PageFileIdAndLevel & id_lvl) const { - throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); + std::lock_guard lock{mutex}; + auto iter = page_path_map.find(id_lvl); + if (likely(iter != page_path_map.end())) + return fmt::format("{}/{}", pool.listGlobalPagePaths()[iter->second], path_prefix); + throw Exception(fmt::format("Can not find path for PageFile [id={}_{}]", id_lvl.first, id_lvl.second)); } +void PSDiskDelegatorGlobalMulti::removePageFile(const PageFileIdAndLevel & id_lvl, size_t file_size, bool meta_left, bool remove_from_default_path) +{ + std::lock_guard lock{mutex}; + if (remove_from_default_path) + { + pool.global_capacity->freeUsedSize(pool.listGlobalPagePaths()[default_path_index], file_size); + } + else + { + auto iter = page_path_map.find(id_lvl); + if (unlikely(iter == page_path_map.end())) + return; + auto index = iter->second; + if (!meta_left) + page_path_map.erase(iter); + + pool.global_capacity->freeUsedSize(pool.listGlobalPagePaths()[index], file_size); + } +} + +bool PSDiskDelegatorGlobalSingle::fileExist(const PageFileIdAndLevel & id_lvl) const +{ + return page_path_map.find(id_lvl) != page_path_map.end(); +} size_t PSDiskDelegatorGlobalSingle::numPaths() const { @@ -822,26 +1008,47 @@ Strings PSDiskDelegatorGlobalSingle::listPaths() const String PSDiskDelegatorGlobalSingle::choosePath(const PageFileIdAndLevel & /*id_lvl*/) { - throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); + return fmt::format("{}/{}", pool.listGlobalPagePaths()[0], path_prefix); } size_t PSDiskDelegatorGlobalSingle::addPageFileUsedSize( + const PageFileIdAndLevel & id_lvl, + size_t size_to_add, + const String & pf_parent_path, + bool need_insert_location) +{ + // We need a map for id_lvl -> path_index for function `fileExist` + if (need_insert_location) + { + std::lock_guard lock{mutex}; + page_path_map[id_lvl] = 0; + } + pool.global_capacity->addUsedSize(pf_parent_path, size_to_add); + return 0; +} + +size_t PSDiskDelegatorGlobalSingle::freePageFileUsedSize( const PageFileIdAndLevel & /*id_lvl*/, - size_t /*size_to_add*/, - const String & /*pf_parent_path*/, - bool /*need_insert_location*/) + size_t size_to_free, + const String & pf_parent_path) { - throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); + pool.global_capacity->freeUsedSize(pf_parent_path, size_to_free); + return 0; } String PSDiskDelegatorGlobalSingle::getPageFilePath(const PageFileIdAndLevel & /*id_lvl*/) const { - throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); + return fmt::format("{}/{}", pool.listGlobalPagePaths()[0], path_prefix); } -void PSDiskDelegatorGlobalSingle::removePageFile(const PageFileIdAndLevel & /*id_lvl*/, size_t /*file_size*/, bool /*meta_left*/, bool /*remove_from_default_path*/) +void PSDiskDelegatorGlobalSingle::removePageFile(const PageFileIdAndLevel & id_lvl, size_t file_size, bool /*meta_left*/, bool /*remove_from_default_path*/) { - throw Exception("Not Implemented", ErrorCodes::NOT_IMPLEMENTED); + pool.global_capacity->freeUsedSize(pool.listGlobalPagePaths()[0], file_size); + + std::lock_guard lock{mutex}; + auto iter = page_path_map.find(id_lvl); + if (unlikely(iter != page_path_map.end())) + page_path_map.erase(iter); } } // namespace DB diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index 399551fa241..fd7b70a4bbd 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -94,6 +94,8 @@ class PathPool using PageFilePathMap = std::unordered_map; friend class PSDiskDelegatorRaft; + friend class PSDiskDelegatorGlobalSingle; + friend class PSDiskDelegatorGlobalMulti; private: Strings main_data_paths; @@ -113,7 +115,7 @@ class PathPool class StableDiskDelegator : private boost::noncopyable { public: - StableDiskDelegator(StoragePathPool & pool_) + explicit StableDiskDelegator(StoragePathPool & pool_) : pool(pool_) {} @@ -136,7 +138,9 @@ class StableDiskDelegator : private boost::noncopyable class PSDiskDelegator : private boost::noncopyable { public: - virtual ~PSDiskDelegator() {} + virtual ~PSDiskDelegator() = default; + + virtual bool fileExist(const PageFileIdAndLevel & id_lvl) const = 0; virtual size_t numPaths() const = 0; @@ -153,6 +157,12 @@ class PSDiskDelegator : private boost::noncopyable bool need_insert_location) = 0; + virtual size_t freePageFileUsedSize( + const PageFileIdAndLevel & id_lvl, + size_t size_to_free, + const String & pf_parent_path) + = 0; + virtual String getPageFilePath(const PageFileIdAndLevel & id_lvl) const = 0; virtual void removePageFile(const PageFileIdAndLevel & id_lvl, size_t file_size, bool meta_left, bool remove_from_default_path) = 0; @@ -166,6 +176,8 @@ class PSDiskDelegatorMulti : public PSDiskDelegator , path_prefix(std::move(prefix)) {} + bool fileExist(const PageFileIdAndLevel & id_lvl) const override; + size_t numPaths() const override; String defaultPath() const override; @@ -180,6 +192,11 @@ class PSDiskDelegatorMulti : public PSDiskDelegator const String & pf_parent_path, bool need_insert_location) override; + size_t freePageFileUsedSize( + const PageFileIdAndLevel & id_lvl, + size_t size_to_free, + const String & pf_parent_path) override; + String getPageFilePath(const PageFileIdAndLevel & id_lvl) const override; void removePageFile(const PageFileIdAndLevel & id_lvl, size_t file_size, bool meta_left, bool remove_from_default_path) override; @@ -200,6 +217,8 @@ class PSDiskDelegatorSingle : public PSDiskDelegator , path_prefix(std::move(prefix)) {} + bool fileExist(const PageFileIdAndLevel & id_lvl) const override; + size_t numPaths() const override; String defaultPath() const override; @@ -214,6 +233,11 @@ class PSDiskDelegatorSingle : public PSDiskDelegator const String & pf_parent_path, bool need_insert_location) override; + size_t freePageFileUsedSize( + const PageFileIdAndLevel & id_lvl, + size_t size_to_free, + const String & pf_parent_path) override; + String getPageFilePath(const PageFileIdAndLevel & id_lvl) const override; void removePageFile(const PageFileIdAndLevel & id_lvl, size_t file_size, bool meta_left, bool remove_from_default_path) override; @@ -226,7 +250,9 @@ class PSDiskDelegatorSingle : public PSDiskDelegator class PSDiskDelegatorRaft : public PSDiskDelegator { public: - PSDiskDelegatorRaft(PathPool & pool_); + explicit PSDiskDelegatorRaft(PathPool & pool_); + + bool fileExist(const PageFileIdAndLevel & id_lvl) const override; size_t numPaths() const override; @@ -242,6 +268,11 @@ class PSDiskDelegatorRaft : public PSDiskDelegator const String & pf_parent_path, bool need_insert_location) override; + size_t freePageFileUsedSize( + const PageFileIdAndLevel & id_lvl, + size_t size_to_free, + const String & pf_parent_path) override; + String getPageFilePath(const PageFileIdAndLevel & id_lvl) const override; void removePageFile(const PageFileIdAndLevel & id_lvl, size_t file_size, bool meta_left, bool remove_from_default_path) override; @@ -269,6 +300,8 @@ class PSDiskDelegatorGlobalMulti : public PSDiskDelegator , path_prefix(std::move(prefix)) {} + bool fileExist(const PageFileIdAndLevel & id_lvl) const override; + size_t numPaths() const override; String defaultPath() const override; @@ -283,11 +316,18 @@ class PSDiskDelegatorGlobalMulti : public PSDiskDelegator const String & pf_parent_path, bool need_insert_location) override; + size_t freePageFileUsedSize( + const PageFileIdAndLevel & id_lvl, + size_t size_to_free, + const String & pf_parent_path) override; + String getPageFilePath(const PageFileIdAndLevel & id_lvl) const override; void removePageFile(const PageFileIdAndLevel & id_lvl, size_t file_size, bool meta_left, bool remove_from_default_path) override; private: + mutable std::mutex mutex; + const PathPool & pool; const String path_prefix; // PageFileID -> path index @@ -303,6 +343,8 @@ class PSDiskDelegatorGlobalSingle : public PSDiskDelegator , path_prefix(std::move(prefix)) {} + bool fileExist(const PageFileIdAndLevel & id_lvl) const override; + size_t numPaths() const override; String defaultPath() const override; @@ -317,6 +359,11 @@ class PSDiskDelegatorGlobalSingle : public PSDiskDelegator const String & pf_parent_path, bool need_insert_location) override; + size_t freePageFileUsedSize( + const PageFileIdAndLevel & id_lvl, + size_t size_to_free, + const String & pf_parent_path) override; + String getPageFilePath(const PageFileIdAndLevel & id_lvl) const override; void removePageFile(const PageFileIdAndLevel & id_lvl, size_t file_size, bool meta_left, bool remove_from_default_path) override; @@ -324,6 +371,9 @@ class PSDiskDelegatorGlobalSingle : public PSDiskDelegator private: const PathPool & pool; const String path_prefix; + + mutable std::mutex mutex; + PathPool::PageFilePathMap page_path_map; }; /// A class to manage paths for the specified storage. diff --git a/dbms/src/Storages/tests/gtest_path_pool.cpp b/dbms/src/Storages/tests/gtest_path_pool.cpp index 15510ef9aa3..2c2a1a14aec 100644 --- a/dbms/src/Storages/tests/gtest_path_pool.cpp +++ b/dbms/src/Storages/tests/gtest_path_pool.cpp @@ -27,11 +27,11 @@ namespace DB { namespace tests { -class PathPool_test : public ::testing::Test +class PathPoolTest : public ::testing::Test { public: - PathPool_test() - : log(&Poco::Logger::get("PathPool_test")) + PathPoolTest() + : log(&Poco::Logger::get("PathPoolTest")) {} static void SetUpTestCase() {} @@ -52,7 +52,7 @@ class PathPool_test : public ::testing::Test Poco::Logger * log; }; -TEST_F(PathPool_test, AlignPaths) +TEST_F(PathPoolTest, AlignPaths) try { Strings paths = getMultiTestPaths(); @@ -80,10 +80,10 @@ try ASSERT_EQ(path_get, chosen); } - for (size_t i = 0; i < res.size(); ++i) + for (const auto & r : res) { - auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(res[i])); - LOG_FMT_INFO(log, "[path={}] [used_size={}]", res[i], stat.used_size); + auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(r)); + LOG_FMT_INFO(log, "[path={}] [used_size={}]", r, stat.used_size); } for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) @@ -113,10 +113,10 @@ try ASSERT_EQ(path_get, chosen); } - for (size_t i = 0; i < res.size(); ++i) + for (const auto & r : res) { - auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(res[i])); - LOG_FMT_INFO(log, "[path={}] [used_size={}]", res[i], stat.used_size); + auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(r)); + LOG_FMT_INFO(log, "[path={}] [used_size={}]", r, stat.used_size); } for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) @@ -147,10 +147,10 @@ try ASSERT_EQ(path_get, chosen); } - for (size_t i = 0; i < res.size(); ++i) + for (const auto & r : res) { - auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(res[i])); - LOG_FMT_INFO(log, "[path={}] [used_size={}]", res[i], stat.used_size); + auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(r)); + LOG_FMT_INFO(log, "[path={}] [used_size={}]", r, stat.used_size); } for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) @@ -181,10 +181,10 @@ try ASSERT_EQ(path_get, chosen); } - for (size_t i = 0; i < res.size(); ++i) + for (const auto & r : res) { - auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(res[i])); - LOG_FMT_INFO(log, "[path={}] [used_size={}]", res[i], stat.used_size); + auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(r)); + LOG_FMT_INFO(log, "[path={}] [used_size={}]", r, stat.used_size); } for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) @@ -196,7 +196,7 @@ try } CATCH -TEST_F(PathPool_test, UnalignPaths) +TEST_F(PathPoolTest, UnalignPaths) try { Strings paths = getMultiTestPaths(); @@ -224,10 +224,10 @@ try ASSERT_EQ(path_get, chosen); } - for (size_t i = 0; i < res.size(); ++i) + for (const auto & r : res) { - auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(res[i])); - LOG_FMT_INFO(log, "[path={}] [used_size={}]", res[i], stat.used_size); + auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(r)); + LOG_FMT_INFO(log, "[path={}] [used_size={}]", r, stat.used_size); } for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) @@ -257,10 +257,10 @@ try ASSERT_EQ(path_get, chosen); } - for (size_t i = 0; i < res.size(); ++i) + for (const auto & r : res) { - auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(res[i])); - LOG_FMT_INFO(log, "[path={}] [used_size={}]", res[i], stat.used_size); + auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(r)); + LOG_FMT_INFO(log, "[path={}] [used_size={}]", r, stat.used_size); } for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) @@ -291,10 +291,10 @@ try ASSERT_EQ(path_get, chosen); } - for (size_t i = 0; i < res.size(); ++i) + for (const auto & r : res) { - auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(res[i])); - LOG_FMT_INFO(log, "[path={}] [used_size={}]", res[i], stat.used_size); + auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(r)); + LOG_FMT_INFO(log, "[path={}] [used_size={}]", r, stat.used_size); } for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) @@ -325,10 +325,10 @@ try ASSERT_EQ(path_get, chosen); } - for (size_t i = 0; i < res.size(); ++i) + for (const auto & r : res) { - auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(res[i])); - LOG_FMT_INFO(log, "[path={}] [used_size={}]", res[i], stat.used_size); + auto stat = std::get<0>(ctx.getPathCapacity()->getFsStatsOfPath(r)); + LOG_FMT_INFO(log, "[path={}] [used_size={}]", r, stat.used_size); } for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) @@ -357,7 +357,7 @@ class MockPathCapacityMetrics : public PathCapacityMetrics std::map disk_stats_map; }; -class PathCapcatity : public DB::base::TiFlashStorageTestBasic +class PathCapacity : public DB::base::TiFlashStorageTestBasic { void SetUp() override { @@ -387,7 +387,7 @@ class PathCapcatity : public DB::base::TiFlashStorageTestBasic std::string latest_data_path; }; -TEST_F(PathCapcatity, SingleDiskSinglePathTest) +TEST_F(PathCapacity, SingleDiskSinglePathTest) { size_t capactity = 100; size_t used = 10; @@ -439,7 +439,7 @@ TEST_F(PathCapcatity, SingleDiskSinglePathTest) } } -TEST_F(PathCapcatity, MultiDiskMultiPathTest) +TEST_F(PathCapacity, MultiDiskMultiPathTest) { MockPathCapacityMetrics capacity = MockPathCapacityMetrics(0, {main_data_path}, {100}, {latest_data_path}, {100}); @@ -500,7 +500,7 @@ TEST_F(PathCapcatity, MultiDiskMultiPathTest) ASSERT_EQ(total_stats.avail_size, 50 + 46); } -TEST_F(PathCapcatity, FsStats) +TEST_F(PathCapacity, FsStats) try { size_t global_capacity_quota = 10; diff --git a/dbms/src/TestUtils/MockDiskDelegator.h b/dbms/src/TestUtils/MockDiskDelegator.h index b3d807cca56..5b0fd7ea5b7 100644 --- a/dbms/src/TestUtils/MockDiskDelegator.h +++ b/dbms/src/TestUtils/MockDiskDelegator.h @@ -34,6 +34,11 @@ class MockDiskDelegatorSingle final : public PSDiskDelegator : path(std::move(path_)) {} + bool fileExist(const PageFileIdAndLevel & /*id_lvl*/) const + { + return true; + } + size_t numPaths() const { return 1; @@ -72,6 +77,14 @@ class MockDiskDelegatorSingle final : public PSDiskDelegator return 0; } + size_t freePageFileUsedSize( + const PageFileIdAndLevel & /*id_lvl*/, + size_t /*size_to_free*/, + const String & /*pf_parent_path*/) + { + return 0; + } + private: String path; }; @@ -86,6 +99,12 @@ class MockDiskDelegatorMulti final : public PSDiskDelegator throw Exception("Should not generate MockDiskDelegatorMulti with empty paths"); } + bool fileExist(const PageFileIdAndLevel & /*id_lvl*/) const + { + return true; + } + + size_t numPaths() const { return paths.size(); @@ -124,6 +143,14 @@ class MockDiskDelegatorMulti final : public PSDiskDelegator return 0; } + size_t freePageFileUsedSize( + const PageFileIdAndLevel & /*id_lvl*/, + size_t /*size_to_free*/, + const String & /*pf_parent_path*/) + { + return 0; + } + private: Strings paths; size_t choose_idx = 0;