Skip to content

Commit

Permalink
PageStorage: Fix TiFlash may fail to restart after FAP is enabled (#9319
Browse files Browse the repository at this point in the history
)

close #9307

PageStorage: Fix TiFlash may fail to restart after FAP is enabled
  • Loading branch information
JaySon-Huang authored Aug 15, 2024
1 parent 76b7f36 commit 7d5f4ce
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 22 deletions.
34 changes: 25 additions & 9 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,23 @@ void VersionedPageEntries<Trait>::createDelete(const PageVersion & ver) NO_THREA
}

template <typename Trait>
bool VersionedPageEntries<Trait>::updateLocalCacheForRemotePage(const PageVersion & ver, const PageEntryV3 & entry)
NO_THREAD_SAFETY_ANALYSIS
bool VersionedPageEntries<Trait>::updateLocalCacheForRemotePage(
const PageVersion & ver,
const PageEntryV3 & entry,
bool ignore_delete) NO_THREAD_SAFETY_ANALYSIS
{
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_ENTRY)
{
auto last_iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1, 0));
if (ignore_delete)
{
// find the first non-delete entry until it is the first entry or there is no such entry
while (last_iter != entries.end() && last_iter != entries.begin() && last_iter->second.isDelete())
{
--last_iter;
}
}
RUNTIME_CHECK_MSG(
last_iter != entries.end() && last_iter->second.isEntry(),
"this={}, entries={}, ver={}, entry={}",
Expand Down Expand Up @@ -470,8 +480,9 @@ std::shared_ptr<typename VersionedPageEntries<Trait>::PageId> VersionedPageEntri
}

template <typename Trait>
std::tuple<ResolveResult, typename VersionedPageEntries<Trait>::PageId, PageVersion> VersionedPageEntries<
Trait>::resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV3 * entry) NO_THREAD_SAFETY_ANALYSIS
std::tuple<ResolveResult, typename VersionedPageEntries<Trait>::PageId, PageVersion> //
VersionedPageEntries<Trait>::resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV3 * entry)
NO_THREAD_SAFETY_ANALYSIS
{
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_ENTRY)
Expand Down Expand Up @@ -1819,13 +1830,18 @@ typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::updateLocalCach
auto iter = mvcc_table_directory.lower_bound(id_to_resolve);
assert(iter != mvcc_table_directory.end());
auto & version_list = iter->second;
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = version_list->resolveToPageId(
sequence_to_resolve,
/*ignore_delete=*/id_to_resolve != r.page_id,
nullptr);
// We need to ignore the "deletes" both when resolve page id and update local cache.
// Check `PageDirectory::getByIDImpl` or the unit test
// `UniPageStorageRemoteReadTest.WriteReadRefWithRestart` for details.
const bool ignore_delete = id_to_resolve != r.page_id;
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve]
= version_list->resolveToPageId(sequence_to_resolve, ignore_delete, nullptr);
if (resolve_state == ResolveResult::TO_NORMAL)
{
if (!version_list->updateLocalCacheForRemotePage(PageVersion(sequence_to_resolve, 0), r.entry))
if (!version_list->updateLocalCacheForRemotePage(
PageVersion(sequence_to_resolve, 0),
r.entry,
ignore_delete))
{
// The entry is not valid for updating the version_list.
// Caller should notice these part of "ignored_entries" and release
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ struct EntryOrDelete
PageStorageMemorySummary::versioned_entry_or_delete_bytes.fetch_sub(sizeof(PageEntryV3));
}

static EntryOrDelete newDelete() { return EntryOrDelete(std::nullopt); };
static EntryOrDelete newDelete() { return EntryOrDelete(std::nullopt); }
static EntryOrDelete newNormalEntry(const PageEntryV3 & entry) { return EntryOrDelete(entry); }
static EntryOrDelete newReplacingEntry(const EntryOrDelete & ori_entry, const PageEntryV3 & entry)
{
Expand Down Expand Up @@ -320,7 +320,7 @@ class VersionedPageEntries

// Update the local cache info for remote page,
// Must a hold snap to prevent the page being deleted.
bool updateLocalCacheForRemotePage(const PageVersion & ver, const PageEntryV3 & entry);
bool updateLocalCacheForRemotePage(const PageVersion & ver, const PageEntryV3 & entry, bool ignore_delete);

std::shared_ptr<PageId> fromRestored(const typename PageEntriesEdit::EditRecord & rec);

Expand Down
15 changes: 10 additions & 5 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,18 @@ void PageDirectoryFactory<Trait>::applyRecord(
while (true)
{
const auto & current_version_list = version_list_iter->second;
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = current_version_list->resolveToPageId(
sequence_to_resolve,
/*ignore_delete=*/id_to_resolve != r.page_id,
nullptr);
// We need to ignore the "deletes" both when resolve page id and update local cache.
// Check `PageDirectory::getByIDImpl` or the unit test
// `UniPageStorageRemoteReadTest.WriteReadRefWithRestart` for details.
const bool ignore_delete = id_to_resolve != r.page_id;
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve]
= current_version_list->resolveToPageId(sequence_to_resolve, ignore_delete, nullptr);
if (resolve_state == ResolveResult::TO_NORMAL)
{
current_version_list->updateLocalCacheForRemotePage(PageVersion(sequence_to_resolve, 0), r.entry);
current_version_list->updateLocalCacheForRemotePage(
PageVersion(sequence_to_resolve, 0),
r.entry,
ignore_delete);
break;
}
else if (resolve_state == ResolveResult::TO_REF)
Expand Down
108 changes: 108 additions & 0 deletions dbms/src/Storages/Page/V3/Universal/tests/gtest_remote_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,114 @@ try
}
CATCH

TEST_P(UniPageStorageRemoteReadTest, WriteReadRefWithRestart)
try
{
const String test_page_id = "aaabbb";
/// Prepare data on remote store
auto writer = PS::V3::CPFilesWriter::create({
.data_file_path_pattern = data_file_path_pattern,
.data_file_id_pattern = data_file_id_pattern,
.manifest_file_path = manifest_file_path,
.manifest_file_id = manifest_file_id,
.data_source = PS::V3::CPWriteDataSourceFixture::create({{10, "nahida opened her eyes"}}),
});

writer->writePrefix({
.writer = {},
.sequence = 5,
.last_sequence = 3,
});
{
auto edits = PS::V3::universal::PageEntriesEdit{};
edits.appendRecord(
{.type = PS::V3::EditRecordType::VAR_ENTRY, .page_id = test_page_id, .entry = {.size = 22, .offset = 10}});
writer->writeEditsAndApplyCheckpointInfo(edits);
}
auto data_paths = writer->writeSuffix();
writer.reset();
for (const auto & data_path : data_paths)
{
uploadFile(data_path);
}
uploadFile(manifest_file_path);

/// Put remote page into local
auto manifest_file = PosixRandomAccessFile::create(manifest_file_path);
auto manifest_reader = PS::V3::CPManifestFileReader::create({
.plain_file = manifest_file,
});
manifest_reader->readPrefix();
PS::V3::CheckpointProto::StringsInternMap im;
{
auto edits_r = manifest_reader->readEdits(im);
auto r = edits_r->getRecords();
ASSERT_EQ(1, r.size());

UniversalWriteBatch wb;
wb.disableRemoteLock();
wb.putRemotePage(
r[0].page_id,
0,
r[0].entry.size,
r[0].entry.checkpoint_info.data_location,
std::move(r[0].entry.field_offsets));
page_storage->write(std::move(wb));
}

/// Ref page and delete the original page
{
UniversalWriteBatch wb;
wb.putRefPage("xxx_a", test_page_id);
page_storage->write(std::move(wb));
}
// delete in another wb
{
UniversalWriteBatch wb;
wb.delPage(test_page_id);
page_storage->write(std::move(wb));
}
// Ref again
{
UniversalWriteBatch wb;
wb.putRefPage("xxx_a_a", "xxx_a");
wb.putRefPage("xxx_a_b", "xxx_a");
page_storage->write(std::move(wb));
}
// delete in another wb
{
UniversalWriteBatch wb;
wb.delPage("xxx_a");
page_storage->write(std::move(wb));
}

// read the tail ref page
{
auto page = page_storage->read("xxx_a_a");
ASSERT_TRUE(page.isValid());
ASSERT_EQ("nahida opened her eyes", String(page.data.begin(), page.data.size()));
}

// Mock restart
reload();

{
// ensure the page is deleted as expected
auto page = page_storage->read(test_page_id, nullptr, nullptr, false);
ASSERT_FALSE(page.isValid());
page = page_storage->read("xxx_a", nullptr, nullptr, false);
ASSERT_FALSE(page.isValid());
// read by the ref_id
auto page_ref = page_storage->read("xxx_a_a");
ASSERT_TRUE(page_ref.isValid());
ASSERT_EQ("nahida opened her eyes", String(page_ref.data.begin(), page_ref.data.size()));
page_ref = page_storage->read("xxx_a_b");
ASSERT_TRUE(page_ref.isValid());
ASSERT_EQ("nahida opened her eyes", String(page_ref.data.begin(), page_ref.data.size()));
}
}
CATCH

TEST_P(UniPageStorageRemoteReadTest, MultiThreadReadUpdateRmotePage)
try
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ LogReaderPtr WALStoreReader::createLogReader(
const auto fullname = filename.fullname(filename.stage);
Poco::File f(fullname);
const auto file_size = f.getSize();
LOG_DEBUG(logger, "Open log file for reading, file={} size={}", fullname, file_size);
LOG_INFO(logger, "Open log file for reading, file={} size={}", fullname, file_size);
auto read_buf = ReadBufferFromRandomAccessFileBuilder::buildPtr(
provider,
fullname,
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include <Storages/PathPool.h>
#include <common/logger_useful.h>

#include <cassert>
#include <memory>
#include <mutex>

Expand Down Expand Up @@ -140,7 +139,7 @@ std::tuple<std::unique_ptr<LogWriter>, LogFilename> WALStore::createLogWriter(
auto filename = log_filename.filename(log_filename.stage);
auto fullname = log_filename.fullname(log_filename.stage);
// TODO check whether the file already existed
LOG_INFO(logger, "Creating log file for writing [fullname={}]", fullname);
LOG_INFO(logger, "Creating log file for writing, fullname={}", fullname);
// if it is a temp file, we will manually sync it after writing snapshot
auto log_writer = std::make_unique<LogWriter>(
fullname,
Expand Down Expand Up @@ -255,21 +254,21 @@ bool WALStore::saveSnapshot(
normal_fullname,
EncryptionPath(normal_fullname, ""),
true);
LOG_INFO(logger, "Rename log file to normal done [tempname={}] [fullname={}]", temp_fullname, normal_fullname);
LOG_INFO(logger, "Rename log file to normal done, tempname={} fullname={}", temp_fullname, normal_fullname);

// Remove compacted log files.
removeLogFiles(files_snap.persisted_log_files);

auto get_logging_str = [&]() {
FmtBuffer fmt_buf;
fmt_buf.append("Dumped directory snapshot to log file done. [files_snapshot=");
fmt_buf.append("Dumped directory snapshot to log file done. files_snapshot=");
fmt_buf.joinStr(
files_snap.persisted_log_files.begin(),
files_snap.persisted_log_files.end(),
[](const auto & arg, FmtBuffer & fb) { fb.append(arg.filename(arg.stage)); },
", ");
fmt_buf.fmtAppend(
"] [dump_cost={}] [num_records={}] [file={}] [size={}].",
" dump_cost={} num_records={} file={} size={}",
files_snap.dump_elapsed_ms,
files_snap.num_records,
normal_fullname,
Expand Down
91 changes: 91 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Storages/Page/V3/PageDirectoryFactory.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/PageEntry.h>
#include <Storages/Page/V3/PageEntryCheckpointInfo.h>
#include <Storages/Page/V3/WAL/serialize.h>
#include <Storages/Page/V3/WALStore.h>
#include <Storages/Page/V3/tests/entries_helper.h>
Expand Down Expand Up @@ -825,6 +826,96 @@ try
}
CATCH

TEST_F(PageDirectoryTest, NewRefAfterDelThreeHopsRemotePage)
try
{
// Fix issue: https://github.com/pingcap/tiflash/issues/9307
PageEntryV3 entry1{
.file_id = 0,
.size = 1024,
.padded_size = 0,
.tag = 0,
.offset = 0,
.checksum = 0x0,
.checkpoint_info = OptionalCheckpointInfo(
CheckpointLocation{
.data_file_id = std::make_shared<const std::string>("s3://path/to/file"),
.offset_in_file = 0xa0,
.size_in_file = 1024},
true,
true),
};

{
PageEntriesEdit edit;
edit.put(buildV3Id(TEST_NAMESPACE_ID, 951), entry1);
dir->apply(std::move(edit));
}

{
PageEntriesEdit edit;
edit.ref(buildV3Id(TEST_NAMESPACE_ID, 954), buildV3Id(TEST_NAMESPACE_ID, 951));
dir->apply(std::move(edit));
}

{
PageEntriesEdit edit;
edit.del(buildV3Id(TEST_NAMESPACE_ID, 951));
edit.del(buildV3Id(TEST_NAMESPACE_ID, 951));
dir->apply(std::move(edit));
}

{
PageEntriesEdit edit;
edit.ref(buildV3Id(TEST_NAMESPACE_ID, 972), buildV3Id(TEST_NAMESPACE_ID, 954));
edit.ref(buildV3Id(TEST_NAMESPACE_ID, 985), buildV3Id(TEST_NAMESPACE_ID, 954));
dir->apply(std::move(edit));
}

{
PageEntriesEdit edit;
edit.del(buildV3Id(TEST_NAMESPACE_ID, 954));
dir->apply(std::move(edit));
}

{
PageEntriesEdit edit;
edit.ref(buildV3Id(TEST_NAMESPACE_ID, 998), buildV3Id(TEST_NAMESPACE_ID, 985));
edit.ref(buildV3Id(TEST_NAMESPACE_ID, 1011), buildV3Id(TEST_NAMESPACE_ID, 985));
dir->apply(std::move(edit));
}

auto snap = dir->createSnapshot();
ASSERT_ENTRY_EQ(entry1, dir, 998, snap);

// Assume we download the data into this file offset
PageEntryV3 entry2{
.file_id = 11,
.size = 1024,
.padded_size = 0,
.tag = 0,
.offset = 0xf0,
.checksum = 0xabcd,
.checkpoint_info = OptionalCheckpointInfo(
CheckpointLocation{
.data_file_id = std::make_shared<const std::string>("s3://path/to/file"),
.offset_in_file = 0xa0,
.size_in_file = 1024},
true,
true),
};

// Mock that "update remote" after download from remote store by "snap"
{
PageEntriesEdit edit;
edit.updateRemote(buildV3Id(TEST_NAMESPACE_ID, 998), entry2);
dir->updateLocalCacheForRemotePages(std::move(edit), snap, nullptr);
}
snap = dir->createSnapshot();
ASSERT_ENTRY_EQ(entry2, dir, 998, snap);
}
CATCH

TEST_F(PageDirectoryTest, NewRefAfterDelRandom)
try
{
Expand Down

0 comments on commit 7d5f4ce

Please sign in to comment.