Skip to content

Commit

Permalink
Fix restore problems in PageStorage. (#4350)
Browse files Browse the repository at this point in the history
ref #3594
  • Loading branch information
jiaqizho authored Mar 21, 2022
1 parent 2bd5626 commit 48c78f9
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 4 deletions.
16 changes: 16 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,22 @@ std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const
return std::nullopt;
}

std::optional<PageEntryV3> VersionedPageEntries::getLastEntry() const
{
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_ENTRY)
{
for (auto it_r = entries.rbegin(); it_r != entries.rend(); it_r++)
{
if (it_r->second.isEntry())
{
return it_r->second.entry;
}
}
}
return std::nullopt;
}

Int64 VersionedPageEntries::incrRefCount(const PageVersionType & ver)
{
auto page_lock = acquireLock();
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class VersionedPageEntries

std::optional<PageEntryV3> getEntry(UInt64 seq) const;

std::optional<PageEntryV3> getLastEntry() const;

/**
* If there are entries point to file in `blob_ids`, take out the <page_id, ver, entry> and
* store them into `blob_versioned_entries`.
Expand Down
40 changes: 37 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ PageDirectoryPtr PageDirectoryFactory::create(FileProviderPtr & file_provider, P
auto [wal, reader] = WALStore::create(file_provider, delegator);
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(wal));
loadFromDisk(dir, std::move(reader));

// Reset the `sequence` to the maximum of persisted.
dir->sequence = max_applied_ver.sequence;

// After restoring from the disk, we need cleanup all invalid entries in memory, or it will
// try to run GC again on some entries that are already marked as invalid in BlobStore.
dir->gcInMemEntries();

if (blob_stats)
{
// After all entries restored to `mvcc_table_directory`, only apply
Expand All @@ -39,7 +44,11 @@ PageDirectoryPtr PageDirectoryFactory::create(FileProviderPtr & file_provider, P
for (const auto & [page_id, entries] : dir->mvcc_table_directory)
{
(void)page_id;
if (auto entry = entries->getEntry(max_applied_ver.sequence); entry)

// We should restore the entry to `blob_stats` even if it is marked as "deleted",
// or we will mistakenly reuse the space to write other blobs down into that space.
// So we need to use `getLastEntry` instead of `getEntry(version)` here.
if (auto entry = entries->getLastEntry(); entry)
{
blob_stats->restoreByEntry(*entry);
}
Expand All @@ -58,10 +67,35 @@ PageDirectoryPtr PageDirectoryFactory::createFromEdit(FileProviderPtr & file_pro
(void)reader;
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(wal));
loadEdit(dir, edit);
if (blob_stats)
blob_stats->restore();
// Reset the `sequence` to the maximum of persisted.
dir->sequence = max_applied_ver.sequence;

// After restoring from the disk, we need cleanup all invalid entries in memory, or it will
// try to run GC again on some entries that are already marked as invalid in BlobStore.
dir->gcInMemEntries();

if (blob_stats)
{
// After all entries restored to `mvcc_table_directory`, only apply
// the latest entry to `blob_stats`, or we may meet error since
// some entries may be removed in memory but not get compacted
// in the log file.
for (const auto & [page_id, entries] : dir->mvcc_table_directory)
{
(void)page_id;

// We should restore the entry to `blob_stats` even if it is marked as "deleted",
// or we will mistakenly reuse the space to write other blobs down into that space.
// So we need to use `getLastEntry` instead of `getEntry(version)` here.
if (auto entry = entries->getLastEntry(); entry)
{
blob_stats->restoreByEntry(*entry);
}
}

blob_stats->restore();
}

return dir;
}

Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class PageDirectoryFactory

// just for test
PageDirectoryPtr createFromEdit(FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit);
// just for test
PageDirectoryFactory & setBlobStats(BlobStore::BlobStats & blob_stats_)
{
blob_stats = &blob_stats_;
return *this;
}

private:
void loadFromDisk(const PageDirectoryPtr & dir, WALStoreReaderPtr && reader);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ PageStorageImpl::~PageStorageImpl() = default;

void PageStorageImpl::restore()
{
// TODO: clean up blobstore.
// TODO: Speedup restoring
PageDirectoryFactory factory;
page_directory = factory
Expand Down
58 changes: 57 additions & 1 deletion dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Exception.h>
#include <Common/FmtUtils.h>
#include <Common/LogWithPrefix.h>
#include <Encryption/FileProvider.h>
#include <IO/WriteHelpers.h>
#include <Storages/Page/Page.h>
Expand Down Expand Up @@ -41,6 +42,10 @@ namespace PS::V3::tests
class PageDirectoryTest : public DB::base::TiFlashStorageTestBasic
{
public:
PageDirectoryTest()
: log(getLogWithPrefix(nullptr, "PageDirectoryTest"))
{}

void SetUp() override
{
auto path = getTemporaryPath();
Expand All @@ -55,6 +60,8 @@ class PageDirectoryTest : public DB::base::TiFlashStorageTestBasic

protected:
PageDirectoryPtr dir;

LogWithPrefixPtr log;
};

TEST_F(PageDirectoryTest, ApplyPutRead)
Expand Down Expand Up @@ -1764,7 +1771,6 @@ try
PSDiskDelegatorPtr delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(path);
PageDirectoryFactory factory;
auto d = factory.createFromEdit(provider, delegator, edit);
d->gcInMemEntries();
return d;
};

Expand Down Expand Up @@ -1951,6 +1957,56 @@ try
}
CATCH

TEST_F(PageDirectoryGCTest, RestoreWithRef)
try
{
PageEntryV3 entry_1_v1{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567};
PageEntryV3 entry_5_v1{.file_id = 5, .size = 255, .tag = 0, .offset = 0x100, .checksum = 0x4567};
PageEntryV3 entry_5_v2{.file_id = 5, .size = 255, .tag = 0, .offset = 0x400, .checksum = 0x4567};
{
PageEntriesEdit edit;
edit.put(1, entry_1_v1);
edit.put(5, entry_5_v1);
dir->apply(std::move(edit));
}
{
PageEntriesEdit edit;
edit.ref(2, 1);
edit.del(1);
edit.put(5, entry_5_v2); // replaced for page 5 entry
dir->apply(std::move(edit));
}

auto restore_from_edit = [](const PageEntriesEdit & edit, BlobStore::BlobStats & stats) {
auto ctx = ::DB::tests::TiFlashTestEnv::getContext();
auto provider = ctx.getFileProvider();
auto path = getTemporaryPath();
PSDiskDelegatorPtr delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(path);
PageDirectoryFactory factory;
auto d = factory.setBlobStats(stats).createFromEdit(provider, delegator, edit);
return d;
};
{
auto snap = dir->createSnapshot();
auto edit = dir->dumpSnapshotToEdit(snap);
BlobStore::BlobStats stats(log, BlobStore::Config{});
auto restored_dir = restore_from_edit(edit, stats);
auto temp_snap = restored_dir->createSnapshot();
EXPECT_SAME_ENTRY(entry_1_v1, restored_dir->get(2, temp_snap).second);
EXPECT_ANY_THROW(restored_dir->get(1, temp_snap));
EXPECT_SAME_ENTRY(entry_5_v2, restored_dir->get(5, temp_snap).second);

// The entry_1_v1 should be restored to stats
auto stat_for_file_1 = stats.blobIdToStat(1, false, false);
EXPECT_TRUE(stat_for_file_1->smap->isMarkUsed(entry_1_v1.offset, entry_1_v1.size));
auto stat_for_file_5 = stats.blobIdToStat(5, false, false);
// entry_5_v1 should not be restored to stats
EXPECT_FALSE(stat_for_file_5->smap->isMarkUsed(entry_5_v1.offset, entry_5_v1.size));
EXPECT_TRUE(stat_for_file_5->smap->isMarkUsed(entry_5_v2.offset, entry_5_v2.size));
}
}
CATCH

TEST_F(PageDirectoryTest, GetMaxId)
try
{
Expand Down
40 changes: 40 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -905,5 +905,45 @@ try
}
CATCH


TEST_F(PageStorageTest, readRefAfterRestore)
try
{
const size_t buf_sz = 1024;
char c_buff[buf_sz];

for (size_t i = 0; i < buf_sz; ++i)
{
c_buff[i] = i % 0xff;
}

{
WriteBatch batch;
batch.putPage(1, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, PageFieldSizes{{32, 64, 79, 128, 196, 256, 269}});
batch.putRefPage(3, 1);
batch.delPage(1);
batch.putPage(4, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, {});
page_storage->write(std::move(batch));
}

page_storage = reopenWithConfig(config);

{
WriteBatch batch;
memset(c_buff, 0, buf_sz);
batch.putPage(5, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, {});
page_storage->write(std::move(batch));
}

std::vector<PageStorage::PageReadFields> fields;
PageStorage::PageReadFields field;
field.first = 3;
field.second = {0, 1, 2, 3, 4, 5, 6};
fields.emplace_back(field);

ASSERT_NO_THROW(page_storage->read(fields));
}
CATCH

} // namespace PS::V3::tests
} // namespace DB

0 comments on commit 48c78f9

Please sign in to comment.