diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index c611bb62509..18532a395d3 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -389,7 +389,7 @@ try if (iter.name == "pk") { EXPECT_EQ(c->getInt(i), expected_row_pk++); - std::cerr << "pk:" << c->getInt(i) << std::endl; + //std::cerr << "pk:" << c->getInt(i) << std::endl; } } } diff --git a/dbms/src/Storages/Page/PageEntries.h b/dbms/src/Storages/Page/PageEntries.h index 58bf16bfef9..faf4cada80a 100644 --- a/dbms/src/Storages/Page/PageEntries.h +++ b/dbms/src/Storages/Page/PageEntries.h @@ -1,10 +1,10 @@ #pragma once #include +#include #include #include #include -#include #include #include @@ -75,6 +75,15 @@ class PageEntriesMixin } } + inline std::optional findNormalPageEntry(PageId page_id) const + { + auto iter = normal_pages.find(page_id); + if (iter == normal_pages.end()) + return std::nullopt; + else + return iter->second; + } + inline PageEntry & at(const PageId page_id) { PageId normal_page_id = resolveRefId(page_id); diff --git a/dbms/src/Storages/Page/PageFile.cpp b/dbms/src/Storages/Page/PageFile.cpp index b5d78cb2e58..beffba29491 100644 --- a/dbms/src/Storages/Page/PageFile.cpp +++ b/dbms/src/Storages/Page/PageFile.cpp @@ -512,6 +512,14 @@ void PageFile::destroy() const } } +bool PageFile::isExist() const +{ + Poco::File file(folderPath()); + Poco::File data_file(dataPath()); + Poco::File meta_file(metaPath()); + return (file.exists() && data_file.exists() && meta_file.exists()); +} + UInt64 PageFile::getDataFileSize() const { Poco::File file(dataPath()); diff --git a/dbms/src/Storages/Page/PageFile.h b/dbms/src/Storages/Page/PageFile.h index 34d74386f6f..2c3f1460825 100644 --- a/dbms/src/Storages/Page/PageFile.h +++ b/dbms/src/Storages/Page/PageFile.h @@ -111,6 +111,7 @@ class PageFile : public Allocator bool isValid() const { return file_id; } UInt64 getDataFileAppendPos() const { return data_file_pos; } UInt64 getDataFileSize() const; + bool isExist() const; private: /// Create a new page file. diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 1f1d84bc5da..3ddb8c6583f 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -366,6 +366,8 @@ bool PageStorage::gc() } auto && [valid_size, valid_page_ids_in_file] = file_valid_pages[page_entry->fileIdLevel()]; valid_size += page_entry->size; + valid_page_ids_in_file.emplace_back(page_id); + } #else for (auto iter = snapshot->version()->pages_cbegin(); iter != snapshot->version()->pages_cend(); ++iter) { @@ -373,9 +375,9 @@ bool PageStorage::gc() const PageEntry & page_entry = iter->second; auto && [valid_size, valid_page_ids_in_file] = file_valid_pages[page_entry.fileIdLevel()]; valid_size += page_entry.size; -#endif valid_page_ids_in_file.emplace_back(page_id); } +#endif } // Select gc candidate files into `merge_files` @@ -393,11 +395,8 @@ bool PageStorage::gc() return false; } - LOG_INFO(log, - storage_name << " GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions"); - // There are no valid pages to be migrated but valid ref pages, scan over all `merge_files` and do migrate. - gc_file_entries_edit = gcMigratePages(snapshot, file_valid_pages, merge_files); + gc_file_entries_edit = gcMigratePages(snapshot, file_valid_pages, merge_files, migrate_page_count); } std::set live_files; @@ -469,15 +468,33 @@ PageStorage::GcCandidates PageStorage::gcSelectCandidateFiles( // keep readable return merge_files; } -PageEntriesEdit -PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const +PageEntriesEdit PageStorage::gcMigratePages(const SnapshotPtr & snapshot, + const GcLivesPages & file_valid_pages, + const GcCandidates & merge_files, + const size_t migrate_page_count) const { PageEntriesEdit gc_file_edit; // merge `merge_files` to PageFile which PageId = max of all `merge_files` and level = level + 1 auto [largest_file_id, level] = *(merge_files.rbegin()); - PageFile gc_file = PageFile::newPageFile(largest_file_id, level + 1, storage_path, /* is_tmp= */ true, page_file_log); + { + // In case that those files are hold by snapshot and do gcMigrate to same PageFile again, we need to check if gc_file is already exist. + PageFile gc_file = PageFile::openPageFileForRead(largest_file_id, level + 1, storage_path, page_file_log); + if (gc_file.isExist()) + { + LOG_INFO(log, storage_name << " GC migration to PageFile_" << largest_file_id << "_" << level + 1 << " is done before."); + return gc_file_edit; + } + } + + // Create a tmp PageFile for migration + PageFile gc_file = PageFile::newPageFile(largest_file_id, level + 1, storage_path, /* is_tmp= */ true, page_file_log); + LOG_INFO(log, + storage_name << " GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count + << " regions to PageFile_" << largest_file_id << "_" << level + 1); + + // We should check these nums, if any of them is non-zero, we should set `gc_file` to formal. size_t num_successful_migrate_pages = 0; size_t num_valid_ref_pages = 0; size_t num_del_page_meta = 0; @@ -508,7 +525,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f { try { - const auto page_entry = current->find(page_id); + const auto page_entry = current->findNormalPageEntry(page_id); if (!page_entry) continue; // This page is covered by newer file. @@ -570,14 +587,15 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f } } // free gc_file_writer and sync - if (gc_file_edit.empty() && num_valid_ref_pages == 0) + const auto id = gc_file.fileIdLevel(); + if (gc_file_edit.empty() && num_valid_ref_pages == 0 && num_del_page_meta == 0) { + LOG_INFO(log, storage_name << " No valid pages, deleting PageFile_" << id.first << "_" << id.second); gc_file.destroy(); } else { gc_file.setFormal(); - const auto id = gc_file.fileIdLevel(); LOG_INFO(log, storage_name << " GC have migrated " << num_successful_migrate_pages // << " regions and " << num_valid_ref_pages // @@ -589,7 +607,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f /** * Delete obsolete files that are not used by any version - * @param page_files All avaliable files in disk + * @param page_files All available files in disk * @param writing_file_id_level The PageFile id which is writing to * @param live_files The live files after gc */ @@ -607,7 +625,7 @@ void PageStorage::gcRemoveObsoleteFiles(const std::set; using GcLivesPages = std::map>; - GcCandidates gcSelectCandidateFiles(const std::set & page_files, - const GcLivesPages & file_valid_pages, - const PageFileIdAndLevel & writing_file_id_level, - UInt64 & candidate_total_size, - size_t & migrate_page_count) const; - PageEntriesEdit - gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const; + GcCandidates gcSelectCandidateFiles(const std::set & page_files, + const GcLivesPages & file_valid_pages, + const PageFileIdAndLevel & writing_file_id_level, + UInt64 & candidate_total_size, + size_t & migrate_page_count) const; + PageEntriesEdit gcMigratePages(const SnapshotPtr & snapshot, + const GcLivesPages & file_valid_pages, + const GcCandidates & merge_files, + size_t migrate_page_count) const; static void gcRemoveObsoleteFiles(const std::set & page_files, const PageFileIdAndLevel & writing_file_id_level, diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesBuilder.h b/dbms/src/Storages/Page/VersionSet/PageEntriesBuilder.h index b2458cd2281..0c6178fd0b7 100644 --- a/dbms/src/Storages/Page/VersionSet/PageEntriesBuilder.h +++ b/dbms/src/Storages/Page/VersionSet/PageEntriesBuilder.h @@ -44,8 +44,8 @@ class PageEntriesBuilder { if (rec.type != WriteBatch::WriteType::PUT) continue; - // Gc only apply PUT for updating page entries - const auto old_page_entry = old_version->find(rec.page_id); + // Gc only apply PUT for updating normal page entries + const auto old_page_entry = old_version->findNormalPageEntry(rec.page_id); // If the gc page have already been removed, or is a ref to non-exist page, just ignore it if (!old_page_entry) continue; diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp b/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp index 946fc04567b..939ed67160b 100644 --- a/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp +++ b/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp @@ -13,25 +13,27 @@ namespace DB std::set PageEntriesVersionSetWithDelta::gcApply(PageEntriesEdit & edit) { - std::unique_lock lock(read_mutex); - - if (current.use_count() == 1 && current->isBase()) - { - // If no readers, we could directly merge edits - EditAcceptor::gcApplyInplace(current, edit); - } - else + if (!edit.empty()) { - if (current.use_count() != 1) + std::unique_lock lock(read_mutex); + + if (current.use_count() == 1 && current->isBase()) { - VersionPtr v = VersionType::createDelta(); - appendVersion(std::move(v)); + // If no readers, we could directly merge edits + EditAcceptor::gcApplyInplace(current, edit); + } + else + { + if (current.use_count() != 1) + { + VersionPtr v = VersionType::createDelta(); + appendVersion(std::move(v)); + } + auto view = std::make_shared(current); + EditAcceptor builder(view.get()); + builder.gcApply(edit); } - auto view = std::make_shared(current); - EditAcceptor builder(view.get()); - builder.gcApply(edit); } - return listAllLiveFiles(); } diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesView.h b/dbms/src/Storages/Page/VersionSet/PageEntriesView.h index f6857fd213a..e3baabfc5c8 100644 --- a/dbms/src/Storages/Page/VersionSet/PageEntriesView.h +++ b/dbms/src/Storages/Page/VersionSet/PageEntriesView.h @@ -25,7 +25,7 @@ class PageEntriesView std::set validPageIds() const; // For iterate over all normal pages - std::set validNormalPageIds() const; + std::set validNormalPageIds() const; std::optional findNormalPageEntry(PageId page_id) const; PageId maxId() const; diff --git a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp index 79576777039..b25a3d02204 100644 --- a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp @@ -250,7 +250,7 @@ TEST_F(PageStorage_test, GcMigrateValidRefPages) PageStorage::SnapshotPtr snapshot = storage->getSnapshot(); //candidates.insert(PageFileIdAndLevel{1, 0}); candidates.insert(PageFileIdAndLevel{2, 0}); - const PageEntriesEdit gc_file_edit = storage->gcMigratePages(snapshot, lives_pages, candidates); + const PageEntriesEdit gc_file_edit = storage->gcMigratePages(snapshot, lives_pages, candidates, 3); ASSERT_FALSE(gc_file_edit.empty()); // check the ref is migrated. // check the deleted ref is not migrated. @@ -273,6 +273,81 @@ TEST_F(PageStorage_test, GcMigrateValidRefPages) ASSERT_FALSE(is_deleted_ref_id_exists); } +TEST_F(PageStorage_test, GcMoveNormalPage) +{ + const size_t buf_sz = 256; + char c_buff[buf_sz]; + + { + WriteBatch batch; + memset(c_buff, 0xf, buf_sz); + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, 0, buff, buf_sz); + batch.putRefPage(2, 1); + batch.putRefPage(3, 2); + + batch.delPage(1); + + storage->write(batch); + } + + PageFileIdAndLevel id_and_lvl = {1, 0}; // PageFile{1, 0} is ready to be migrated by gc + PageStorage::GcLivesPages livesPages{{id_and_lvl, + {buf_sz, + { + 1, + }}}}; + PageStorage::GcCandidates candidates{ + id_and_lvl, + }; + auto s0 = storage->getSnapshot(); + const auto page_files = PageStorage::listAllPageFiles(storage->storage_path, true, storage->page_file_log); + PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates, 1); + auto live_files = storage->versioned_page_entries.gcApply(edit); + storage->gcRemoveObsoleteFiles(page_files, {2, 0}, live_files); + + // After migrate, RefPage 3 -> 1 is still valid + bool exist = false; + for (const auto & rec : edit.getRecords()) + { + if (rec.type == WriteBatch::WriteType::REF && rec.page_id == 3 && rec.ori_page_id == 1) + { + exist = true; + break; + } + } + ASSERT_TRUE(exist); + s0.reset(); + + // reopen PageStorage, RefPage 3 -> 1 is still valid + storage = reopenWithConfig(config); + auto s1 = storage->getSnapshot(); + + auto [is_ref, normal_page_id] = s1->version()->isRefId(3); + ASSERT_TRUE(is_ref); + ASSERT_EQ(normal_page_id, 1UL); + + std::tie(is_ref, normal_page_id) = s1->version()->isRefId(2); + ASSERT_TRUE(is_ref); + ASSERT_EQ(normal_page_id, 1UL); + + // Page 1 is deleted. + auto entry1 = s1->version()->find(1); + ASSERT_FALSE(entry1); + + // Normal page 1 is moved to PageFile_1_1 + entry1 = s1->version()->findNormalPageEntry(normal_page_id); + ASSERT_TRUE(entry1); + ASSERT_EQ(entry1->fileIdLevel().first, 1UL); + ASSERT_EQ(entry1->fileIdLevel().second, 1UL); + + Page page = storage->read(3, s1); + ASSERT_EQ(page.data.size(), buf_sz); + + page = storage->read(2, s1); + ASSERT_EQ(page.data.size(), buf_sz); +} + TEST_F(PageStorage_test, GcMoveRefPage) { const size_t buf_sz = 256; @@ -301,7 +376,7 @@ TEST_F(PageStorage_test, GcMoveRefPage) id_and_lvl, }; auto s0 = storage->getSnapshot(); - PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates); + PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates, 2); // After migrate, RefPage 3 -> 1 is still valid bool exist = false; @@ -358,7 +433,7 @@ TEST_F(PageStorage_test, GcMovePageDelMeta) }; const auto page_files = PageStorage::listAllPageFiles(storage->storage_path, true, storage->page_file_log); auto s0 = storage->getSnapshot(); - PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates); + PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates, 2); // We should see migration of DelPage1 bool exist = false; diff --git a/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp b/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp index 5dc886f82ba..5d5b31fa962 100644 --- a/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp +++ b/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp @@ -111,7 +111,7 @@ int main(int argc, char ** argv) auto page_ids = snapshot->version()->validPageIds(); for (auto page_id : page_ids) { - const DB::PageEntry * entry = snapshot->version()->find(page_id); + const auto entry = snapshot->version()->find(page_id); printPageEntry(page_id, *entry); } #if 0