Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix restore problems in PageStorage. #4350

Merged
merged 9 commits into from
Mar 21, 2022
16 changes: 16 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,22 @@ std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const
return std::nullopt;
}

std::optional<PageEntryV3> VersionedPageEntries::getLastEntry() const
{
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_ENTRY)
{
for (auto it_r = entries.rbegin(); it_r != entries.rend(); it_r++)
{
if (it_r->second.isEntry())
{
return it_r->second.entry;
}
}
}
return std::nullopt;
}

Int64 VersionedPageEntries::incrRefCount(const PageVersionType & ver)
{
auto page_lock = acquireLock();
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class VersionedPageEntries

std::optional<PageEntryV3> getEntry(UInt64 seq) const;

std::optional<PageEntryV3> getLastEntry() const;

/**
* If there are entries point to file in `blob_ids`, take out the <page_id, ver, entry> and
* store them into `blob_versioned_entries`.
Expand Down
40 changes: 37 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ PageDirectoryPtr PageDirectoryFactory::create(FileProviderPtr & file_provider, P
auto [wal, reader] = WALStore::create(file_provider, delegator);
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(wal));
loadFromDisk(dir, std::move(reader));

// Reset the `sequence` to the maximum of persisted.
dir->sequence = max_applied_ver.sequence;

// After restoring from the disk, we need cleanup all invalid entries in memory, or it will
// try to run GC again on some entries that are already marked as invalid in BlobStore.
dir->gcInMemEntries();

if (blob_stats)
{
// After all entries restored to `mvcc_table_directory`, only apply
Expand All @@ -39,7 +44,11 @@ PageDirectoryPtr PageDirectoryFactory::create(FileProviderPtr & file_provider, P
for (const auto & [page_id, entries] : dir->mvcc_table_directory)
{
(void)page_id;
if (auto entry = entries->getEntry(max_applied_ver.sequence); entry)

// We should restore the entry to `blob_stats` even if it is marked as "deleted",
// or we will mistakenly reuse the space to write other blobs down into that space.
// So we need to use `getLastEntry` instead of `getEntry(version)` here.
if (auto entry = entries->getLastEntry(); entry)
{
blob_stats->restoreByEntry(*entry);
}
Expand All @@ -58,10 +67,35 @@ PageDirectoryPtr PageDirectoryFactory::createFromEdit(FileProviderPtr & file_pro
(void)reader;
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(wal));
loadEdit(dir, edit);
if (blob_stats)
blob_stats->restore();
// Reset the `sequence` to the maximum of persisted.
dir->sequence = max_applied_ver.sequence;

// After restoring from the disk, we need cleanup all invalid entries in memory, or it will
// try to run GC again on some entries that are already marked as invalid in BlobStore.
dir->gcInMemEntries();

if (blob_stats)
{
// After all entries restored to `mvcc_table_directory`, only apply
// the latest entry to `blob_stats`, or we may meet error since
// some entries may be removed in memory but not get compacted
// in the log file.
for (const auto & [page_id, entries] : dir->mvcc_table_directory)
{
(void)page_id;

// We should restore the entry to `blob_stats` even if it is marked as "deleted",
// or we will mistakenly reuse the space to write other blobs down into that space.
// So we need to use `getLastEntry` instead of `getEntry(version)` here.
if (auto entry = entries->getLastEntry(); entry)
{
blob_stats->restoreByEntry(*entry);
}
}

blob_stats->restore();
}

return dir;
}

Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class PageDirectoryFactory

// just for test
PageDirectoryPtr createFromEdit(FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit);
// just for test
PageDirectoryFactory & setBlobStats(BlobStore::BlobStats & blob_stats_)
{
blob_stats = &blob_stats_;
return *this;
}

private:
void loadFromDisk(const PageDirectoryPtr & dir, WALStoreReaderPtr && reader);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ PageStorageImpl::~PageStorageImpl() = default;

void PageStorageImpl::restore()
{
// TODO: clean up blobstore.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What thing do you plan to optimize by adding this "TODO" comment? Can you explain more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is such an extreme example.
Before shutdown:

blob 1 : valid entries 10
blob 2 : valid entries 100
blob 3 : valid entries 100

after shutdown, Then we do the restore:

blob 1 : all of entries is invalid 
blob 2 : valid entries less than 100, but still exist valid entry
blob 3 : valid entries less than 100, but still exist valid entry

Then blob 1 won't restored, because it has not any valid entries.
But at this time, we also won't remove it in the disk. Because it is not created on memory. So we need add a scan after we restored pagestorage. Then removed blob 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it

// TODO: Speedup restoring
PageDirectoryFactory factory;
page_directory = factory
Expand Down
58 changes: 57 additions & 1 deletion dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Exception.h>
#include <Common/FmtUtils.h>
#include <Common/LogWithPrefix.h>
#include <Encryption/FileProvider.h>
#include <IO/WriteHelpers.h>
#include <Storages/Page/Page.h>
Expand Down Expand Up @@ -41,6 +42,10 @@ namespace PS::V3::tests
class PageDirectoryTest : public DB::base::TiFlashStorageTestBasic
{
public:
PageDirectoryTest()
: log(getLogWithPrefix(nullptr, "PageDirectoryTest"))
{}

void SetUp() override
{
auto path = getTemporaryPath();
Expand All @@ -55,6 +60,8 @@ class PageDirectoryTest : public DB::base::TiFlashStorageTestBasic

protected:
PageDirectoryPtr dir;

LogWithPrefixPtr log;
};

TEST_F(PageDirectoryTest, ApplyPutRead)
Expand Down Expand Up @@ -1764,7 +1771,6 @@ try
PSDiskDelegatorPtr delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(path);
PageDirectoryFactory factory;
auto d = factory.createFromEdit(provider, delegator, edit);
d->gcInMemEntries();
return d;
};

Expand Down Expand Up @@ -1951,6 +1957,56 @@ try
}
CATCH

TEST_F(PageDirectoryGCTest, RestoreWithRef)
try
{
PageEntryV3 entry_1_v1{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567};
PageEntryV3 entry_5_v1{.file_id = 5, .size = 255, .tag = 0, .offset = 0x100, .checksum = 0x4567};
PageEntryV3 entry_5_v2{.file_id = 5, .size = 255, .tag = 0, .offset = 0x400, .checksum = 0x4567};
{
PageEntriesEdit edit;
edit.put(1, entry_1_v1);
edit.put(5, entry_5_v1);
dir->apply(std::move(edit));
}
{
PageEntriesEdit edit;
edit.ref(2, 1);
edit.del(1);
edit.put(5, entry_5_v2); // replaced for page 5 entry
dir->apply(std::move(edit));
}

auto restore_from_edit = [](const PageEntriesEdit & edit, BlobStore::BlobStats & stats) {
auto ctx = ::DB::tests::TiFlashTestEnv::getContext();
auto provider = ctx.getFileProvider();
auto path = getTemporaryPath();
PSDiskDelegatorPtr delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(path);
PageDirectoryFactory factory;
auto d = factory.setBlobStats(stats).createFromEdit(provider, delegator, edit);
return d;
};
{
auto snap = dir->createSnapshot();
auto edit = dir->dumpSnapshotToEdit(snap);
BlobStore::BlobStats stats(log, BlobStore::Config{});
auto restored_dir = restore_from_edit(edit, stats);
auto temp_snap = restored_dir->createSnapshot();
EXPECT_SAME_ENTRY(entry_1_v1, restored_dir->get(2, temp_snap).second);
EXPECT_ANY_THROW(restored_dir->get(1, temp_snap));
EXPECT_SAME_ENTRY(entry_5_v2, restored_dir->get(5, temp_snap).second);

// The entry_1_v1 should be restored to stats
auto stat_for_file_1 = stats.blobIdToStat(1, false, false);
EXPECT_TRUE(stat_for_file_1->smap->isMarkUsed(entry_1_v1.offset, entry_1_v1.size));
auto stat_for_file_5 = stats.blobIdToStat(5, false, false);
// entry_5_v1 should not be restored to stats
EXPECT_FALSE(stat_for_file_5->smap->isMarkUsed(entry_5_v1.offset, entry_5_v1.size));
EXPECT_TRUE(stat_for_file_5->smap->isMarkUsed(entry_5_v2.offset, entry_5_v2.size));
}
}
CATCH

TEST_F(PageDirectoryTest, GetMaxId)
try
{
Expand Down
40 changes: 40 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -905,5 +905,45 @@ try
}
CATCH


TEST_F(PageStorageTest, readRefAfterRestore)
try
{
const size_t buf_sz = 1024;
char c_buff[buf_sz];

for (size_t i = 0; i < buf_sz; ++i)
{
c_buff[i] = i % 0xff;
}

{
WriteBatch batch;
batch.putPage(1, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, PageFieldSizes{{32, 64, 79, 128, 196, 256, 269}});
batch.putRefPage(3, 1);
batch.delPage(1);
batch.putPage(4, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, {});
page_storage->write(std::move(batch));
}

page_storage = reopenWithConfig(config);

{
WriteBatch batch;
memset(c_buff, 0, buf_sz);
batch.putPage(5, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, {});
page_storage->write(std::move(batch));
}

std::vector<PageStorage::PageReadFields> fields;
PageStorage::PageReadFields field;
field.first = 3;
field.second = {0, 1, 2, 3, 4, 5, 6};
fields.emplace_back(field);

ASSERT_NO_THROW(page_storage->read(fields));
}
CATCH

} // namespace PS::V3::tests
} // namespace DB