Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix restore problems in PageStorage. #4350

Merged
merged 9 commits into from
Mar 21, 2022
27 changes: 26 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 *
return {RESOLVE_FAIL, buildV3Id(0, 0), PageVersionType(0)};
}

std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const
std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq, bool allow_forward) const
{
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_ENTRY)
Expand All @@ -348,7 +348,32 @@ std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const
{
// NORMAL
if (iter->second.isEntry())
{
return iter->second.entry;
}
else if (allow_forward) // iter->second.isDelete()
{
if (iter == entries.begin())
{
throw Exception(fmt::format("Can't restore the entries [seq={}]", seq));
}

// Check (Begin entry, last entry - 1]
while (iter != entries.begin())
{
iter--;
if (iter->second.isEntry())
{
return iter->second.entry;
}
}

// Check Begin entry
if (iter->second.isEntry())
{
return iter->second.entry;
}
}
}
}
return std::nullopt;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class VersionedPageEntries

Int64 incrRefCount(const PageVersionType & ver);

std::optional<PageEntryV3> getEntry(UInt64 seq) const;
std::optional<PageEntryV3> getEntry(UInt64 seq, bool allow_forward = false) const;

/**
* If there are entries point to file in `blob_ids`, take out the <page_id, ver, entry> and
Expand Down
36 changes: 35 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@ PageDirectoryPtr PageDirectoryFactory::create(FileProviderPtr & file_provider, P
auto [wal, reader] = WALStore::create(file_provider, delegator);
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(wal));
loadFromDisk(dir, std::move(reader));

// Reset the `sequence` to the maximum of persisted.
dir->sequence = max_applied_ver.sequence;

// After we restore from disk.
// We need cleanup all invalid entries in memory.
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
dir->gcInMemEntries();

if (blob_stats)
{
// After all entries restored to `mvcc_table_directory`, only apply
Expand All @@ -38,7 +43,36 @@ PageDirectoryPtr PageDirectoryFactory::create(FileProviderPtr & file_provider, P
for (const auto & [page_id, entries] : dir->mvcc_table_directory)
{
(void)page_id;
if (auto entry = entries->getEntry(max_applied_ver.sequence); entry)

// We must allow entry forward search.
// Otherwise, It is likely to cause data loss.
// for example:
//
// page id 4927
// {type:5, create_ver: <0,0>, is_deleted: false, delete_ver: <0,0>, ori_page_id: 0.0, being_ref_count: 1, num_entries: 2}
// entry 0
// sequence: 1802
// epoch: 0
// is del: false
// blob id: 5
// offset: 77661628
// size: 2381165
// crc: 0x1D1BEF504F12D3A0
// field offset size: 0
// entry 1
// sequence: 2121
// epoch: 0
// is del: true
// blob id: 0
// offset: 0
// size: 0
// crc: 0x0
// field offset size: 0
// page id 5819
// {type:6, create_ver: <2090,0>, is_deleted: false, delete_ver: <0,0>, ori_page_id: 0.4927, being_ref_count: 1, num_entries: 0}
//
// After getEntry, page id `4927` won't be restore by BlobStore.
if (auto entry = entries->getEntry(max_applied_ver.sequence, true); entry)
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
{
blob_stats->restoreByEntry(*entry);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ PageStorageImpl::~PageStorageImpl() = default;

void PageStorageImpl::restore()
{
// TODO: clean up blobstore.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What thing do you plan to optimize by adding this "TODO" comment? Can you explain more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is such an extreme example.
Before shutdown:

blob 1 : valid entries 10
blob 2 : valid entries 100
blob 3 : valid entries 100

after shutdown, Then we do the restore:

blob 1 : all of entries is invalid 
blob 2 : valid entries less than 100, but still exist valid entry
blob 3 : valid entries less than 100, but still exist valid entry

Then blob 1 won't restored, because it has not any valid entries.
But at this time, we also won't remove it in the disk. Because it is not created on memory. So we need add a scan after we restored pagestorage. Then removed blob 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it

// TODO: Speedup restoring
PageDirectoryFactory factory;
page_directory = factory
Expand Down
39 changes: 39 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -900,5 +900,44 @@ try
CATCH


TEST_F(PageStorageTest, readRefAfterRestore)
try
{
const size_t buf_sz = 1024;
char c_buff[buf_sz];

for (size_t i = 0; i < buf_sz; ++i)
{
c_buff[i] = i % 0xff;
}

{
WriteBatch batch;
batch.putPage(1, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, PageFieldSizes{{32, 64, 79, 128, 196, 256, 269}});
batch.putRefPage(3, 1);
batch.delPage(1);
batch.putPage(4, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, {});
page_storage->write(std::move(batch));
}

page_storage = reopenWithConfig(config);

{
WriteBatch batch;
memset(c_buff, 0, buf_sz);
batch.putPage(5, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, {});
page_storage->write(std::move(batch));
}

std::vector<PageStorage::PageReadFields> fields;
PageStorage::PageReadFields field;
field.first = 3;
field.second = {0, 1, 2, 3, 4, 5, 6};
fields.emplace_back(field);

ASSERT_NO_THROW(page_storage->read(fields));
}
CATCH

} // namespace PS::V3::tests
} // namespace DB