Skip to content

Commit

Permalink
[FLASH-528] Fix PageStorage error: Directory not empty (pingcap#263)
Browse files Browse the repository at this point in the history
* fix bug: move normal page entry with ref-id is deleted

* fix bug: gcMigrate to same gc_file
  • Loading branch information
JaySon-Huang authored and zanmato1984 committed Nov 1, 2019
1 parent e079db0 commit ad0beeb
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ try
if (iter.name == "pk")
{
EXPECT_EQ(c->getInt(i), expected_row_pk++);
std::cerr << "pk:" << c->getInt(i) << std::endl;
//std::cerr << "pk:" << c->getInt(i) << std::endl;
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Storages/Page/PageEntries.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#pragma once

#include <cassert>
#include <optional>
#include <shared_mutex>
#include <unordered_map>
#include <unordered_set>
#include <optional>

#include <IO/WriteHelpers.h>
#include <common/likely.h>
Expand Down Expand Up @@ -75,6 +75,15 @@ class PageEntriesMixin
}
}

inline std::optional<PageEntry> findNormalPageEntry(PageId page_id) const
{
auto iter = normal_pages.find(page_id);
if (iter == normal_pages.end())
return std::nullopt;
else
return iter->second;
}

inline PageEntry & at(const PageId page_id)
{
PageId normal_page_id = resolveRefId(page_id);
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/Page/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,14 @@ void PageFile::destroy() const
}
}

bool PageFile::isExist() const
{
Poco::File file(folderPath());
Poco::File data_file(dataPath());
Poco::File meta_file(metaPath());
return (file.exists() && data_file.exists() && meta_file.exists());
}

UInt64 PageFile::getDataFileSize() const
{
Poco::File file(dataPath());
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/PageFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class PageFile : public Allocator<false>
bool isValid() const { return file_id; }
UInt64 getDataFileAppendPos() const { return data_file_pos; }
UInt64 getDataFileSize() const;
bool isExist() const;

private:
/// Create a new page file.
Expand Down
44 changes: 31 additions & 13 deletions dbms/src/Storages/Page/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,16 +366,18 @@ bool PageStorage::gc()
}
auto && [valid_size, valid_page_ids_in_file] = file_valid_pages[page_entry->fileIdLevel()];
valid_size += page_entry->size;
valid_page_ids_in_file.emplace_back(page_id);
}
#else
for (auto iter = snapshot->version()->pages_cbegin(); iter != snapshot->version()->pages_cend(); ++iter)
{
const PageId page_id = iter->first;
const PageEntry & page_entry = iter->second;
auto && [valid_size, valid_page_ids_in_file] = file_valid_pages[page_entry.fileIdLevel()];
valid_size += page_entry.size;
#endif
valid_page_ids_in_file.emplace_back(page_id);
}
#endif
}

// Select gc candidate files into `merge_files`
Expand All @@ -393,11 +395,8 @@ bool PageStorage::gc()
return false;
}

LOG_INFO(log,
storage_name << " GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions");

// There are no valid pages to be migrated but valid ref pages, scan over all `merge_files` and do migrate.
gc_file_entries_edit = gcMigratePages(snapshot, file_valid_pages, merge_files);
gc_file_entries_edit = gcMigratePages(snapshot, file_valid_pages, merge_files, migrate_page_count);
}

std::set<PageFileIdAndLevel> live_files;
Expand Down Expand Up @@ -469,15 +468,33 @@ PageStorage::GcCandidates PageStorage::gcSelectCandidateFiles( // keep readable
return merge_files;
}

PageEntriesEdit
PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const
PageEntriesEdit PageStorage::gcMigratePages(const SnapshotPtr & snapshot,
const GcLivesPages & file_valid_pages,
const GcCandidates & merge_files,
const size_t migrate_page_count) const
{
PageEntriesEdit gc_file_edit;

// merge `merge_files` to PageFile which PageId = max of all `merge_files` and level = level + 1
auto [largest_file_id, level] = *(merge_files.rbegin());
PageFile gc_file = PageFile::newPageFile(largest_file_id, level + 1, storage_path, /* is_tmp= */ true, page_file_log);

{
// In case that those files are hold by snapshot and do gcMigrate to same PageFile again, we need to check if gc_file is already exist.
PageFile gc_file = PageFile::openPageFileForRead(largest_file_id, level + 1, storage_path, page_file_log);
if (gc_file.isExist())
{
LOG_INFO(log, storage_name << " GC migration to PageFile_" << largest_file_id << "_" << level + 1 << " is done before.");
return gc_file_edit;
}
}

// Create a tmp PageFile for migration
PageFile gc_file = PageFile::newPageFile(largest_file_id, level + 1, storage_path, /* is_tmp= */ true, page_file_log);
LOG_INFO(log,
storage_name << " GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count
<< " regions to PageFile_" << largest_file_id << "_" << level + 1);

// We should check these nums, if any of them is non-zero, we should set `gc_file` to formal.
size_t num_successful_migrate_pages = 0;
size_t num_valid_ref_pages = 0;
size_t num_del_page_meta = 0;
Expand Down Expand Up @@ -508,7 +525,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
{
try
{
const auto page_entry = current->find(page_id);
const auto page_entry = current->findNormalPageEntry(page_id);
if (!page_entry)
continue;
// This page is covered by newer file.
Expand Down Expand Up @@ -570,14 +587,15 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
}
} // free gc_file_writer and sync

if (gc_file_edit.empty() && num_valid_ref_pages == 0)
const auto id = gc_file.fileIdLevel();
if (gc_file_edit.empty() && num_valid_ref_pages == 0 && num_del_page_meta == 0)
{
LOG_INFO(log, storage_name << " No valid pages, deleting PageFile_" << id.first << "_" << id.second);
gc_file.destroy();
}
else
{
gc_file.setFormal();
const auto id = gc_file.fileIdLevel();
LOG_INFO(log,
storage_name << " GC have migrated " << num_successful_migrate_pages //
<< " regions and " << num_valid_ref_pages //
Expand All @@ -589,7 +607,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f

/**
* Delete obsolete files that are not used by any version
* @param page_files All avaliable files in disk
* @param page_files All available files in disk
* @param writing_file_id_level The PageFile id which is writing to
* @param live_files The live files after gc
*/
Expand All @@ -607,7 +625,7 @@ void PageStorage::gcRemoveObsoleteFiles(const std::set<PageFile, PageFile::Compa

if (live_files.count(page_id_and_lvl) == 0)
{
// the page file is not used by any version, remove reader cache
// the page file is not used by any version, remove the page file in disk
page_file.destroy();
}
}
Expand Down
16 changes: 9 additions & 7 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@ class PageStorage
// gc helper functions
using GcCandidates = std::set<PageFileIdAndLevel>;
using GcLivesPages = std::map<PageFileIdAndLevel, std::pair<size_t, PageIds>>;
GcCandidates gcSelectCandidateFiles(const std::set<PageFile, PageFile::Comparator> & page_files,
const GcLivesPages & file_valid_pages,
const PageFileIdAndLevel & writing_file_id_level,
UInt64 & candidate_total_size,
size_t & migrate_page_count) const;
PageEntriesEdit
gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const;
GcCandidates gcSelectCandidateFiles(const std::set<PageFile, PageFile::Comparator> & page_files,
const GcLivesPages & file_valid_pages,
const PageFileIdAndLevel & writing_file_id_level,
UInt64 & candidate_total_size,
size_t & migrate_page_count) const;
PageEntriesEdit gcMigratePages(const SnapshotPtr & snapshot,
const GcLivesPages & file_valid_pages,
const GcCandidates & merge_files,
size_t migrate_page_count) const;

static void gcRemoveObsoleteFiles(const std::set<PageFile, PageFile::Comparator> & page_files,
const PageFileIdAndLevel & writing_file_id_level,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/VersionSet/PageEntriesBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class PageEntriesBuilder
{
if (rec.type != WriteBatch::WriteType::PUT)
continue;
// Gc only apply PUT for updating page entries
const auto old_page_entry = old_version->find(rec.page_id);
// Gc only apply PUT for updating normal page entries
const auto old_page_entry = old_version->findNormalPageEntry(rec.page_id);
// If the gc page have already been removed, or is a ref to non-exist page, just ignore it
if (!old_page_entry)
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,27 @@ namespace DB

std::set<PageFileIdAndLevel> PageEntriesVersionSetWithDelta::gcApply(PageEntriesEdit & edit)
{
std::unique_lock lock(read_mutex);

if (current.use_count() == 1 && current->isBase())
{
// If no readers, we could directly merge edits
EditAcceptor::gcApplyInplace(current, edit);
}
else
if (!edit.empty())
{
if (current.use_count() != 1)
std::unique_lock lock(read_mutex);

if (current.use_count() == 1 && current->isBase())
{
VersionPtr v = VersionType::createDelta();
appendVersion(std::move(v));
// If no readers, we could directly merge edits
EditAcceptor::gcApplyInplace(current, edit);
}
else
{
if (current.use_count() != 1)
{
VersionPtr v = VersionType::createDelta();
appendVersion(std::move(v));
}
auto view = std::make_shared<PageEntriesView>(current);
EditAcceptor builder(view.get());
builder.gcApply(edit);
}
auto view = std::make_shared<PageEntriesView>(current);
EditAcceptor builder(view.get());
builder.gcApply(edit);
}

return listAllLiveFiles();
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/VersionSet/PageEntriesView.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PageEntriesView
std::set<PageId> validPageIds() const;

// For iterate over all normal pages
std::set<PageId> validNormalPageIds() const;
std::set<PageId> validNormalPageIds() const;
std::optional<PageEntry> findNormalPageEntry(PageId page_id) const;

PageId maxId() const;
Expand Down
81 changes: 78 additions & 3 deletions dbms/src/Storages/Page/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ TEST_F(PageStorage_test, GcMigrateValidRefPages)
PageStorage::SnapshotPtr snapshot = storage->getSnapshot();
//candidates.insert(PageFileIdAndLevel{1, 0});
candidates.insert(PageFileIdAndLevel{2, 0});
const PageEntriesEdit gc_file_edit = storage->gcMigratePages(snapshot, lives_pages, candidates);
const PageEntriesEdit gc_file_edit = storage->gcMigratePages(snapshot, lives_pages, candidates, 3);
ASSERT_FALSE(gc_file_edit.empty());
// check the ref is migrated.
// check the deleted ref is not migrated.
Expand All @@ -273,6 +273,81 @@ TEST_F(PageStorage_test, GcMigrateValidRefPages)
ASSERT_FALSE(is_deleted_ref_id_exists);
}

TEST_F(PageStorage_test, GcMoveNormalPage)
{
const size_t buf_sz = 256;
char c_buff[buf_sz];

{
WriteBatch batch;
memset(c_buff, 0xf, buf_sz);
ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
batch.putPage(1, 0, buff, buf_sz);
batch.putRefPage(2, 1);
batch.putRefPage(3, 2);

batch.delPage(1);

storage->write(batch);
}

PageFileIdAndLevel id_and_lvl = {1, 0}; // PageFile{1, 0} is ready to be migrated by gc
PageStorage::GcLivesPages livesPages{{id_and_lvl,
{buf_sz,
{
1,
}}}};
PageStorage::GcCandidates candidates{
id_and_lvl,
};
auto s0 = storage->getSnapshot();
const auto page_files = PageStorage::listAllPageFiles(storage->storage_path, true, storage->page_file_log);
PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates, 1);
auto live_files = storage->versioned_page_entries.gcApply(edit);
storage->gcRemoveObsoleteFiles(page_files, {2, 0}, live_files);

// After migrate, RefPage 3 -> 1 is still valid
bool exist = false;
for (const auto & rec : edit.getRecords())
{
if (rec.type == WriteBatch::WriteType::REF && rec.page_id == 3 && rec.ori_page_id == 1)
{
exist = true;
break;
}
}
ASSERT_TRUE(exist);
s0.reset();

// reopen PageStorage, RefPage 3 -> 1 is still valid
storage = reopenWithConfig(config);
auto s1 = storage->getSnapshot();

auto [is_ref, normal_page_id] = s1->version()->isRefId(3);
ASSERT_TRUE(is_ref);
ASSERT_EQ(normal_page_id, 1UL);

std::tie(is_ref, normal_page_id) = s1->version()->isRefId(2);
ASSERT_TRUE(is_ref);
ASSERT_EQ(normal_page_id, 1UL);

// Page 1 is deleted.
auto entry1 = s1->version()->find(1);
ASSERT_FALSE(entry1);

// Normal page 1 is moved to PageFile_1_1
entry1 = s1->version()->findNormalPageEntry(normal_page_id);
ASSERT_TRUE(entry1);
ASSERT_EQ(entry1->fileIdLevel().first, 1UL);
ASSERT_EQ(entry1->fileIdLevel().second, 1UL);

Page page = storage->read(3, s1);
ASSERT_EQ(page.data.size(), buf_sz);

page = storage->read(2, s1);
ASSERT_EQ(page.data.size(), buf_sz);
}

TEST_F(PageStorage_test, GcMoveRefPage)
{
const size_t buf_sz = 256;
Expand Down Expand Up @@ -301,7 +376,7 @@ TEST_F(PageStorage_test, GcMoveRefPage)
id_and_lvl,
};
auto s0 = storage->getSnapshot();
PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates);
PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates, 2);

// After migrate, RefPage 3 -> 1 is still valid
bool exist = false;
Expand Down Expand Up @@ -358,7 +433,7 @@ TEST_F(PageStorage_test, GcMovePageDelMeta)
};
const auto page_files = PageStorage::listAllPageFiles(storage->storage_path, true, storage->page_file_log);
auto s0 = storage->getSnapshot();
PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates);
PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates, 2);

// We should see migration of DelPage1
bool exist = false;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ int main(int argc, char ** argv)
auto page_ids = snapshot->version()->validPageIds();
for (auto page_id : page_ids)
{
const DB::PageEntry * entry = snapshot->version()->find(page_id);
const auto entry = snapshot->version()->find(page_id);
printPageEntry(page_id, *entry);
}
#if 0
Expand Down

0 comments on commit ad0beeb

Please sign in to comment.