diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index c53dc0f2d8d..113c1b99ddb 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -331,13 +331,23 @@ void VersionedPageEntries::createDelete(const PageVersion & ver) NO_THREA } template -bool VersionedPageEntries::updateLocalCacheForRemotePage(const PageVersion & ver, const PageEntryV3 & entry) - NO_THREAD_SAFETY_ANALYSIS +bool VersionedPageEntries::updateLocalCacheForRemotePage( + const PageVersion & ver, + const PageEntryV3 & entry, + bool ignore_delete) NO_THREAD_SAFETY_ANALYSIS { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_ENTRY) { auto last_iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1, 0)); + if (ignore_delete) + { + // find the first non-delete entry until it is the first entry or there is no such entry + while (last_iter != entries.end() && last_iter != entries.begin() && last_iter->second.isDelete()) + { + --last_iter; + } + } RUNTIME_CHECK_MSG( last_iter != entries.end() && last_iter->second.isEntry(), "this={}, entries={}, ver={}, entry={}", @@ -470,8 +480,9 @@ std::shared_ptr::PageId> VersionedPageEntri } template -std::tuple::PageId, PageVersion> VersionedPageEntries< - Trait>::resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV3 * entry) NO_THREAD_SAFETY_ANALYSIS +std::tuple::PageId, PageVersion> // +VersionedPageEntries::resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV3 * entry) + NO_THREAD_SAFETY_ANALYSIS { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_ENTRY) @@ -1819,13 +1830,18 @@ typename PageDirectory::PageEntries PageDirectory::updateLocalCach auto iter = mvcc_table_directory.lower_bound(id_to_resolve); assert(iter != mvcc_table_directory.end()); auto & version_list = iter->second; - auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = version_list->resolveToPageId( - sequence_to_resolve, - /*ignore_delete=*/id_to_resolve != r.page_id, - nullptr); + // We need to ignore the "deletes" both when resolve page id and update local cache. + // Check `PageDirectory::getByIDImpl` or the unit test + // `UniPageStorageRemoteReadTest.WriteReadRefWithRestart` for details. + const bool ignore_delete = id_to_resolve != r.page_id; + auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] + = version_list->resolveToPageId(sequence_to_resolve, ignore_delete, nullptr); if (resolve_state == ResolveResult::TO_NORMAL) { - if (!version_list->updateLocalCacheForRemotePage(PageVersion(sequence_to_resolve, 0), r.entry)) + if (!version_list->updateLocalCacheForRemotePage( + PageVersion(sequence_to_resolve, 0), + r.entry, + ignore_delete)) { // The entry is not valid for updating the version_list. // Caller should notice these part of "ignored_entries" and release diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 5103e327ddc..ce79e8bd204 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -253,7 +253,7 @@ struct EntryOrDelete PageStorageMemorySummary::versioned_entry_or_delete_bytes.fetch_sub(sizeof(PageEntryV3)); } - static EntryOrDelete newDelete() { return EntryOrDelete(std::nullopt); }; + static EntryOrDelete newDelete() { return EntryOrDelete(std::nullopt); } static EntryOrDelete newNormalEntry(const PageEntryV3 & entry) { return EntryOrDelete(entry); } static EntryOrDelete newReplacingEntry(const EntryOrDelete & ori_entry, const PageEntryV3 & entry) { @@ -320,7 +320,7 @@ class VersionedPageEntries // Update the local cache info for remote page, // Must a hold snap to prevent the page being deleted. - bool updateLocalCacheForRemotePage(const PageVersion & ver, const PageEntryV3 & entry); + bool updateLocalCacheForRemotePage(const PageVersion & ver, const PageEntryV3 & entry, bool ignore_delete); std::shared_ptr fromRestored(const typename PageEntriesEdit::EditRecord & rec); diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 9dbcfc47bfa..c93bc32cb2b 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -290,13 +290,18 @@ void PageDirectoryFactory::applyRecord( while (true) { const auto & current_version_list = version_list_iter->second; - auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = current_version_list->resolveToPageId( - sequence_to_resolve, - /*ignore_delete=*/id_to_resolve != r.page_id, - nullptr); + // We need to ignore the "deletes" both when resolve page id and update local cache. + // Check `PageDirectory::getByIDImpl` or the unit test + // `UniPageStorageRemoteReadTest.WriteReadRefWithRestart` for details. + const bool ignore_delete = id_to_resolve != r.page_id; + auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] + = current_version_list->resolveToPageId(sequence_to_resolve, ignore_delete, nullptr); if (resolve_state == ResolveResult::TO_NORMAL) { - current_version_list->updateLocalCacheForRemotePage(PageVersion(sequence_to_resolve, 0), r.entry); + current_version_list->updateLocalCacheForRemotePage( + PageVersion(sequence_to_resolve, 0), + r.entry, + ignore_delete); break; } else if (resolve_state == ResolveResult::TO_REF) diff --git a/dbms/src/Storages/Page/V3/Universal/tests/gtest_remote_read.cpp b/dbms/src/Storages/Page/V3/Universal/tests/gtest_remote_read.cpp index 814bd12b1f1..836f7888d8f 100644 --- a/dbms/src/Storages/Page/V3/Universal/tests/gtest_remote_read.cpp +++ b/dbms/src/Storages/Page/V3/Universal/tests/gtest_remote_read.cpp @@ -368,6 +368,114 @@ try } CATCH +TEST_P(UniPageStorageRemoteReadTest, WriteReadRefWithRestart) +try +{ + const String test_page_id = "aaabbb"; + /// Prepare data on remote store + auto writer = PS::V3::CPFilesWriter::create({ + .data_file_path_pattern = data_file_path_pattern, + .data_file_id_pattern = data_file_id_pattern, + .manifest_file_path = manifest_file_path, + .manifest_file_id = manifest_file_id, + .data_source = PS::V3::CPWriteDataSourceFixture::create({{10, "nahida opened her eyes"}}), + }); + + writer->writePrefix({ + .writer = {}, + .sequence = 5, + .last_sequence = 3, + }); + { + auto edits = PS::V3::universal::PageEntriesEdit{}; + edits.appendRecord( + {.type = PS::V3::EditRecordType::VAR_ENTRY, .page_id = test_page_id, .entry = {.size = 22, .offset = 10}}); + writer->writeEditsAndApplyCheckpointInfo(edits); + } + auto data_paths = writer->writeSuffix(); + writer.reset(); + for (const auto & data_path : data_paths) + { + uploadFile(data_path); + } + uploadFile(manifest_file_path); + + /// Put remote page into local + auto manifest_file = PosixRandomAccessFile::create(manifest_file_path); + auto manifest_reader = PS::V3::CPManifestFileReader::create({ + .plain_file = manifest_file, + }); + manifest_reader->readPrefix(); + PS::V3::CheckpointProto::StringsInternMap im; + { + auto edits_r = manifest_reader->readEdits(im); + auto r = edits_r->getRecords(); + ASSERT_EQ(1, r.size()); + + UniversalWriteBatch wb; + wb.disableRemoteLock(); + wb.putRemotePage( + r[0].page_id, + 0, + r[0].entry.size, + r[0].entry.checkpoint_info.data_location, + std::move(r[0].entry.field_offsets)); + page_storage->write(std::move(wb)); + } + + /// Ref page and delete the original page + { + UniversalWriteBatch wb; + wb.putRefPage("xxx_a", test_page_id); + page_storage->write(std::move(wb)); + } + // delete in another wb + { + UniversalWriteBatch wb; + wb.delPage(test_page_id); + page_storage->write(std::move(wb)); + } + // Ref again + { + UniversalWriteBatch wb; + wb.putRefPage("xxx_a_a", "xxx_a"); + wb.putRefPage("xxx_a_b", "xxx_a"); + page_storage->write(std::move(wb)); + } + // delete in another wb + { + UniversalWriteBatch wb; + wb.delPage("xxx_a"); + page_storage->write(std::move(wb)); + } + + // read the tail ref page + { + auto page = page_storage->read("xxx_a_a"); + ASSERT_TRUE(page.isValid()); + ASSERT_EQ("nahida opened her eyes", String(page.data.begin(), page.data.size())); + } + + // Mock restart + reload(); + + { + // ensure the page is deleted as expected + auto page = page_storage->read(test_page_id, nullptr, nullptr, false); + ASSERT_FALSE(page.isValid()); + page = page_storage->read("xxx_a", nullptr, nullptr, false); + ASSERT_FALSE(page.isValid()); + // read by the ref_id + auto page_ref = page_storage->read("xxx_a_a"); + ASSERT_TRUE(page_ref.isValid()); + ASSERT_EQ("nahida opened her eyes", String(page_ref.data.begin(), page_ref.data.size())); + page_ref = page_storage->read("xxx_a_b"); + ASSERT_TRUE(page_ref.isValid()); + ASSERT_EQ("nahida opened her eyes", String(page_ref.data.begin(), page_ref.data.size())); + } +} +CATCH + TEST_P(UniPageStorageRemoteReadTest, MultiThreadReadUpdateRmotePage) try { diff --git a/dbms/src/Storages/Page/V3/WAL/WALReader.cpp b/dbms/src/Storages/Page/V3/WAL/WALReader.cpp index cd4abaecbe0..c45f8b61da2 100644 --- a/dbms/src/Storages/Page/V3/WAL/WALReader.cpp +++ b/dbms/src/Storages/Page/V3/WAL/WALReader.cpp @@ -173,7 +173,7 @@ LogReaderPtr WALStoreReader::createLogReader( const auto fullname = filename.fullname(filename.stage); Poco::File f(fullname); const auto file_size = f.getSize(); - LOG_DEBUG(logger, "Open log file for reading, file={} size={}", fullname, file_size); + LOG_INFO(logger, "Open log file for reading, file={} size={}", fullname, file_size); auto read_buf = ReadBufferFromRandomAccessFileBuilder::buildPtr( provider, fullname, diff --git a/dbms/src/Storages/Page/V3/WALStore.cpp b/dbms/src/Storages/Page/V3/WALStore.cpp index 82b74cf7e41..d14a137b22e 100644 --- a/dbms/src/Storages/Page/V3/WALStore.cpp +++ b/dbms/src/Storages/Page/V3/WALStore.cpp @@ -33,7 +33,6 @@ #include #include -#include #include #include @@ -140,7 +139,7 @@ std::tuple, LogFilename> WALStore::createLogWriter( auto filename = log_filename.filename(log_filename.stage); auto fullname = log_filename.fullname(log_filename.stage); // TODO check whether the file already existed - LOG_INFO(logger, "Creating log file for writing [fullname={}]", fullname); + LOG_INFO(logger, "Creating log file for writing, fullname={}", fullname); // if it is a temp file, we will manually sync it after writing snapshot auto log_writer = std::make_unique( fullname, @@ -255,21 +254,21 @@ bool WALStore::saveSnapshot( normal_fullname, EncryptionPath(normal_fullname, ""), true); - LOG_INFO(logger, "Rename log file to normal done [tempname={}] [fullname={}]", temp_fullname, normal_fullname); + LOG_INFO(logger, "Rename log file to normal done, tempname={} fullname={}", temp_fullname, normal_fullname); // Remove compacted log files. removeLogFiles(files_snap.persisted_log_files); auto get_logging_str = [&]() { FmtBuffer fmt_buf; - fmt_buf.append("Dumped directory snapshot to log file done. [files_snapshot="); + fmt_buf.append("Dumped directory snapshot to log file done. files_snapshot="); fmt_buf.joinStr( files_snap.persisted_log_files.begin(), files_snap.persisted_log_files.end(), [](const auto & arg, FmtBuffer & fb) { fb.append(arg.filename(arg.stage)); }, ", "); fmt_buf.fmtAppend( - "] [dump_cost={}] [num_records={}] [file={}] [size={}].", + " dump_cost={} num_records={} file={} size={}", files_snap.dump_elapsed_ms, files_snap.num_records, normal_fullname, diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 1a835d376a0..c9463271158 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -825,6 +826,96 @@ try } CATCH +TEST_F(PageDirectoryTest, NewRefAfterDelThreeHopsRemotePage) +try +{ + // Fix issue: https://github.com/pingcap/tiflash/issues/9307 + PageEntryV3 entry1{ + .file_id = 0, + .size = 1024, + .padded_size = 0, + .tag = 0, + .offset = 0, + .checksum = 0x0, + .checkpoint_info = OptionalCheckpointInfo( + CheckpointLocation{ + .data_file_id = std::make_shared("s3://path/to/file"), + .offset_in_file = 0xa0, + .size_in_file = 1024}, + true, + true), + }; + + { + PageEntriesEdit edit; + edit.put(buildV3Id(TEST_NAMESPACE_ID, 951), entry1); + dir->apply(std::move(edit)); + } + + { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 954), buildV3Id(TEST_NAMESPACE_ID, 951)); + dir->apply(std::move(edit)); + } + + { + PageEntriesEdit edit; + edit.del(buildV3Id(TEST_NAMESPACE_ID, 951)); + edit.del(buildV3Id(TEST_NAMESPACE_ID, 951)); + dir->apply(std::move(edit)); + } + + { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 972), buildV3Id(TEST_NAMESPACE_ID, 954)); + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 985), buildV3Id(TEST_NAMESPACE_ID, 954)); + dir->apply(std::move(edit)); + } + + { + PageEntriesEdit edit; + edit.del(buildV3Id(TEST_NAMESPACE_ID, 954)); + dir->apply(std::move(edit)); + } + + { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 998), buildV3Id(TEST_NAMESPACE_ID, 985)); + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 1011), buildV3Id(TEST_NAMESPACE_ID, 985)); + dir->apply(std::move(edit)); + } + + auto snap = dir->createSnapshot(); + ASSERT_ENTRY_EQ(entry1, dir, 998, snap); + + // Assume we download the data into this file offset + PageEntryV3 entry2{ + .file_id = 11, + .size = 1024, + .padded_size = 0, + .tag = 0, + .offset = 0xf0, + .checksum = 0xabcd, + .checkpoint_info = OptionalCheckpointInfo( + CheckpointLocation{ + .data_file_id = std::make_shared("s3://path/to/file"), + .offset_in_file = 0xa0, + .size_in_file = 1024}, + true, + true), + }; + + // Mock that "update remote" after download from remote store by "snap" + { + PageEntriesEdit edit; + edit.updateRemote(buildV3Id(TEST_NAMESPACE_ID, 998), entry2); + dir->updateLocalCacheForRemotePages(std::move(edit), snap, nullptr); + } + snap = dir->createSnapshot(); + ASSERT_ENTRY_EQ(entry2, dir, 998, snap); +} +CATCH + TEST_F(PageDirectoryTest, NewRefAfterDelRandom) try {