diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 94df459b025..12fe2cef2ed 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -63,8 +63,12 @@ void VersionedPageEntries::createNewEntry(const PageVersionType & ver, const Pag if (type == EditRecordType::VAR_ENTRY) { - auto last_iter = entries.rbegin(); - if (last_iter->second.isDelete()) + auto last_iter = MapUtils::findLess(entries, PageVersionType(ver.sequence + 1, 0)); + if (last_iter == entries.end()) + { + entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); + } + else if (last_iter->second.isDelete()) { entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); } @@ -302,10 +306,8 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * } else if (type == EditRecordType::VAR_EXTERNAL) { - // If we applied write batches like this: [ver=1]{putExternal 10}, [ver=2]{ref 11->10, del 10} - // then by ver=2, we should not able to read 10, but able to read 11 (resolving 11 ref to 10). - // when resolving 11 to 10, we need to set `check_prev` to true - bool ok = !is_deleted || (is_deleted && (check_prev ? (seq <= delete_ver.sequence) : (seq < delete_ver.sequence))); + // We may add reference to an external id even if it is logically deleted. + bool ok = check_prev ? true : (!is_deleted || (is_deleted && seq < delete_ver.sequence)); if (create_ver.sequence <= seq && ok) { return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersionType(0)}; @@ -354,8 +356,9 @@ Int64 VersionedPageEntries::incrRefCount(const PageVersionType & ver) } else if (type == EditRecordType::VAR_EXTERNAL) { - if (create_ver <= ver && (!is_deleted || (is_deleted && ver < delete_ver))) + if (create_ver <= ver) { + // We may add reference to an external id even if it is logically deleted. return ++being_ref_count; } } @@ -397,7 +400,6 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( bool VersionedPageEntries::cleanOutdatedEntries( UInt64 lowest_seq, - PageIdV3Internal page_id, std::map> * normal_entries_to_deref, PageEntriesV3 & entries_removed, const PageLock & /*page_lock*/) @@ -416,15 +418,6 @@ bool VersionedPageEntries::cleanOutdatedEntries( // need to decrease the ref count by second.origin_page_id, ver=iter->first, num=1> if (auto [deref_counter, new_created] = normal_entries_to_deref->emplace(std::make_pair(ori_page_id, std::make_pair(/*ver=*/create_ver, /*count=*/1))); !new_created) { - if (deref_counter->second.first.sequence != create_ver.sequence) - { - throw Exception(fmt::format( - "There exist two different version of ref, should not happen [page_id={}] [ori_page_id={}] [ver={}] [another_ver={}]", - page_id, - ori_page_id, - create_ver, - deref_counter->second.first)); - } // the id is already exist in deref map, increase the num to decrease ref count deref_counter->second.second += 1; } @@ -508,7 +501,8 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag } else if (type == EditRecordType::VAR_ENTRY) { - // decrease the ref-counter + // Decrease the ref-counter. The entry may be moved to a newer entry with same sequence but higher epoch, + // so we need to find the one less than and decrease the ref-counter of it. auto iter = MapUtils::findMutLess(entries, PageVersionType(deref_ver.sequence + 1, 0)); if (iter == entries.end()) { @@ -530,7 +524,7 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag // Clean outdated entries after decreased the ref-counter // set `normal_entries_to_deref` to be nullptr to ignore cleaning ref-var-entries - return cleanOutdatedEntries(lowest_seq, page_id, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock); + return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock); } throw Exception(fmt::format("calling derefAndClean with invalid state [state={}]", toDebugString())); @@ -863,7 +857,10 @@ void PageDirectory::applyRefEditRecord( const VersionedPageEntriesPtr & resolve_version_list = resolve_ver_iter->second; // If we already hold the lock from `id_to_resolve`, then we should not request it again. // This can happen when `id_to_resolve` have other operating in current writebatch - auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = resolve_version_list->resolveToPageId(ver_to_resolve.sequence, false, nullptr); + auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = resolve_version_list->resolveToPageId( + ver_to_resolve.sequence, + /*check_prev=*/true, + nullptr); switch (need_collapse) { case VersionedPageEntries::RESOLVE_FAIL: @@ -966,7 +963,7 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write case EditRecordType::VAR_ENTRY: case EditRecordType::VAR_EXTERNAL: case EditRecordType::VAR_REF: - throw Exception(fmt::format("should not handle {} edit", r.type)); + throw Exception(fmt::format("should not handle edit with invalid type [type={}]", r.type)); } } catch (DB::Exception & e) @@ -1000,7 +997,7 @@ void PageDirectory::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiter iter = mvcc_table_directory.find(record.page_id); if (unlikely(iter == mvcc_table_directory.end())) { - throw Exception(fmt::format("Can't found [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR); + throw Exception(fmt::format("Can't find [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR); } } // release the read lock on `table_rw_mutex` @@ -1126,7 +1123,6 @@ PageEntriesV3 PageDirectory::gcInMemEntries() // do gc on the version list without lock on `mvcc_table_directory`. const bool all_deleted = iter->second->cleanOutdatedEntries( lowest_seq, - /*page_id=*/iter->first, &normal_entries_to_deref, all_del_entries, iter->second->acquireLock()); @@ -1161,8 +1157,8 @@ PageEntriesV3 PageDirectory::gcInMemEntries() const bool all_deleted = iter->second->derefAndClean( lowest_seq, page_id, - deref_counter.first, - deref_counter.second, + /*deref_ver=*/deref_counter.first, + /*deref_count=*/deref_counter.second, all_del_entries); if (all_deleted) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index a148a03bab8..c8f9af8e8ea 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -17,8 +18,6 @@ #include #include -#include "Encryption/FileProvider.h" - namespace CurrentMetrics { extern const Metric PSMVCCNumSnapshots; @@ -205,7 +204,6 @@ class VersionedPageEntries */ bool cleanOutdatedEntries( UInt64 lowest_seq, - PageIdV3Internal page_id, std::map> * normal_entries_to_deref, PageEntriesV3 & entries_removed, const PageLock & page_lock); diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index e5427d6e562..d040e9fd378 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -12,17 +12,35 @@ PageDirectoryPtr PageDirectoryFactory::create(FileProviderPtr & file_provider, P auto [wal, reader] = WALStore::create(file_provider, delegator); PageDirectoryPtr dir = std::make_unique(std::move(wal)); loadFromDisk(dir, std::move(reader)); - // TODO: After restored ends, set the last offset of log file for `wal` - if (blob_stats) - blob_stats->restore(); // Reset the `sequence` to the maximum of persisted. dir->sequence = max_applied_ver.sequence; + + if (blob_stats) + { + // After all entries restored to `mvcc_table_directory`, only apply + // the latest entry to `blob_stats`, or we may meet error since + // some entries may be removed in memory but not get compacted + // in the log file. + for (const auto & [page_id, entries] : dir->mvcc_table_directory) + { + (void)page_id; + if (auto entry = entries->getEntry(max_applied_ver.sequence); entry) + { + blob_stats->restoreByEntry(*entry); + } + } + + blob_stats->restore(); + } + + // TODO: After restored ends, set the last offset of log file for `wal` return dir; } PageDirectoryPtr PageDirectoryFactory::createFromEdit(FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit) { auto [wal, reader] = WALStore::create(file_provider, delegator); + (void)reader; PageDirectoryPtr dir = std::make_unique(std::move(wal)); loadEdit(dir, edit); if (blob_stats) @@ -67,8 +85,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr } case EditRecordType::VAR_ENTRY: version_list->fromRestored(r); - if (blob_stats) - blob_stats->restoreByEntry(r.entry); break; case EditRecordType::PUT_EXTERNAL: { @@ -82,8 +98,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr } case EditRecordType::PUT: version_list->createNewEntry(restored_version, r.entry); - if (blob_stats) - blob_stats->restoreByEntry(r.entry); break; case EditRecordType::DEL: case EditRecordType::VAR_DELETE: // nothing different from `DEL` @@ -94,8 +108,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr break; case EditRecordType::UPSERT: version_list->createNewEntry(restored_version, r.entry); - if (blob_stats) - blob_stats->restoreByEntry(r.entry); break; } } diff --git a/dbms/src/Storages/Page/V3/PageEntriesEdit.h b/dbms/src/Storages/Page/V3/PageEntriesEdit.h index b75910edc46..f5fe35f60c7 100644 --- a/dbms/src/Storages/Page/V3/PageEntriesEdit.h +++ b/dbms/src/Storages/Page/V3/PageEntriesEdit.h @@ -68,6 +68,31 @@ enum class EditRecordType VAR_DELETE, }; +inline const char * typeToString(EditRecordType t) +{ + switch (t) + { + case EditRecordType::PUT: + return "PUT "; + case EditRecordType::PUT_EXTERNAL: + return "EXT "; + case EditRecordType::REF: + return "REF "; + case EditRecordType::DEL: + return "DEL "; + case EditRecordType::UPSERT: + return "UPSERT "; + case EditRecordType::VAR_ENTRY: + return "VAR_ENT"; + case EditRecordType::VAR_REF: + return "VAR_REF"; + case EditRecordType::VAR_EXTERNAL: + return "VAR_EXT"; + case EditRecordType::VAR_DELETE: + return "VAR_DEL"; + } +} + /// Page entries change to apply to PageDirectory class PageEntriesEdit { @@ -176,10 +201,28 @@ class PageEntriesEdit PageIdV3Internal ori_page_id; PageVersionType version; PageEntryV3 entry; - Int64 being_ref_count = 1; + Int64 being_ref_count; + + EditRecord() + : page_id(0) + , ori_page_id(0) + , being_ref_count(1) + {} }; using EditRecords = std::vector; + static String toDebugString(const EditRecord & rec) + { + return fmt::format( + "{{type:{}, page_id:{}, ori_id:{}, version:{}, entry:{}, being_ref_count:{}}}", + typeToString(rec.type), + rec.page_id, + rec.ori_page_id, + rec.version, + DB::PS::V3::toDebugString(rec.entry), + rec.being_ref_count); + } + void appendRecord(const EditRecord & rec) { records.emplace_back(rec); diff --git a/dbms/src/Storages/Page/V3/WAL/WALReader.cpp b/dbms/src/Storages/Page/V3/WAL/WALReader.cpp index d4d8b31de84..63974f76eba 100644 --- a/dbms/src/Storages/Page/V3/WAL/WALReader.cpp +++ b/dbms/src/Storages/Page/V3/WAL/WALReader.cpp @@ -79,9 +79,18 @@ WALStoreReader::findCheckpoint(LogFilenameSet && all_files) LogFilename latest_checkpoint = *latest_checkpoint_iter; for (auto iter = all_files.cbegin(); iter != all_files.cend(); /*empty*/) { - if (iter->log_num < latest_checkpoint.log_num) + // We use as the checkpoint, so all files less than or equal + // to latest_checkpoint.log_num can be erase + if (iter->log_num <= latest_checkpoint.log_num) { - // TODO: clean useless file that is older than `checkpoint` + if (iter->log_num == latest_checkpoint.log_num && iter->level_num != 0) + { + // the checkpoint file, not remove + } + else + { + // TODO: clean useless file that is older than `checkpoint` + } iter = all_files.erase(iter); } else @@ -186,6 +195,7 @@ bool WALStoreReader::openNextFile() if (!checkpoint_read_done) { do_open(*checkpoint_file); + checkpoint_read_done = true; } else { diff --git a/dbms/src/Storages/Page/V3/WAL/serialize.cpp b/dbms/src/Storages/Page/V3/WAL/serialize.cpp index d51d67bff34..26f3f97e22e 100644 --- a/dbms/src/Storages/Page/V3/WAL/serialize.cpp +++ b/dbms/src/Storages/Page/V3/WAL/serialize.cpp @@ -64,7 +64,7 @@ void serializePutTo(const PageEntriesEdit::EditRecord & record, WriteBuffer & bu { assert(record.type == EditRecordType::PUT || record.type == EditRecordType::UPSERT || record.type == EditRecordType::VAR_ENTRY); - writeIntBinary(EditRecordType::PUT, buf); + writeIntBinary(record.type, buf); UInt32 flags = 0; writeIntBinary(flags, buf); diff --git a/dbms/src/Storages/Page/V3/WALStore.cpp b/dbms/src/Storages/Page/V3/WALStore.cpp index b9fa1274c06..fea7a1c8361 100644 --- a/dbms/src/Storages/Page/V3/WALStore.cpp +++ b/dbms/src/Storages/Page/V3/WALStore.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -114,10 +115,22 @@ std::tuple, LogFilename> WALStore::createLogWriter( WALStore::FilesSnapshot WALStore::getFilesSnapshot() const { - const auto current_writting_log_num = [this]() { + const auto [ok, current_writting_log_num] = [this]() -> std::tuple { std::lock_guard lock(log_file_mutex); - return log_file->logNumber(); + if (!log_file) + { + return {false, 0}; + } + return {true, log_file->logNumber()}; }(); + // Return empty set if `log_file` is not ready + if (!ok) + { + return WALStore::FilesSnapshot{ + .current_writting_log_num = 0, + .persisted_log_files = {}, + }; + } // Only those files are totally persisted LogFilenameSet persisted_log_files = WALStoreReader::listAllFiles(delegator, logger); @@ -163,12 +176,24 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire LOG_FMT_INFO(logger, "Rename log file to normal done [fullname={}]", normal_fullname); } + // #define ARCHIVE_COMPACTED_LOGS // keep for debug + // Remove compacted log files. for (const auto & filename : files_snap.persisted_log_files) { if (auto f = Poco::File(filename.fullname(LogFileStage::Normal)); f.exists()) { +#ifndef ARCHIVE_COMPACTED_LOGS f.remove(); +#else + const Poco::Path archive_path(delegator->defaultPath(), "archive"); + Poco::File archive_dir(archive_path); + if (!archive_dir.exists()) + archive_dir.createDirectory(); + auto dest = archive_path.toString() + "/" + filename.filename(LogFileStage::Normal); + f.moveTo(dest); + LOG_FMT_INFO(logger, "archive {} to {}", filename.fullname(LogFileStage::Normal), dest); +#endif } } // TODO: Log more information. duration, num entries, size of compact log file... diff --git a/dbms/src/Storages/Page/V3/tests/entries_helper.h b/dbms/src/Storages/Page/V3/tests/entries_helper.h index 00375d16fe3..f2ff6c9e3bc 100644 --- a/dbms/src/Storages/Page/V3/tests/entries_helper.h +++ b/dbms/src/Storages/Page/V3/tests/entries_helper.h @@ -145,7 +145,7 @@ inline ::testing::AssertionResult getEntriesCompare( { // not the expected entry we want String err_msg; - auto expect_expr = fmt::format("Entry at {} [index={}]", idx); + auto expect_expr = fmt::format("Entry at {} [index={}]", idx, idx); auto actual_expr = fmt::format("Get entries {} from {} with snap {} [index={}", page_ids_expr, dir_expr, snap_expr, idx); return testing::internal::EqFailure( expect_expr.c_str(), diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index d71cb1c78a9..75279c87bbb 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -508,6 +508,36 @@ TEST_F(PageDirectoryTest, NewRefAfterDel) } } +TEST_F(PageDirectoryTest, RefToExt) +try +{ + { + PageEntriesEdit edit; + edit.putExternal(83); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(85, 83); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.del(83); + dir->apply(std::move(edit)); + } + // The external id "83" is not changed, + // we may add ref to external "83" even + // if it is logical delete but have other + // alive reference page. + { + PageEntriesEdit edit; + edit.ref(86, 83); + dir->apply(std::move(edit)); + } +} +CATCH + TEST_F(PageDirectoryTest, NormalPageId) try { @@ -583,16 +613,12 @@ CATCH class VersionedEntriesTest : public ::testing::Test { public: - void SetUp() override - { - } - using DerefCounter = std::map>; std::tuple runClean(UInt64 seq) { DerefCounter deref_counter; PageEntriesV3 removed_entries; - bool all_removed = entries.cleanOutdatedEntries(seq, buildV3Id(TEST_NAMESPACE_ID, page_id), &deref_counter, removed_entries, entries.acquireLock()); + bool all_removed = entries.cleanOutdatedEntries(seq, &deref_counter, removed_entries, entries.acquireLock()); return {all_removed, removed_entries, deref_counter}; } @@ -666,6 +692,15 @@ TEST_F(VersionedEntriesTest, InsertGet) } } +TEST_F(VersionedEntriesTest, InsertWithLowerVersion) +{ + INSERT_ENTRY(5); + ASSERT_SAME_ENTRY(*entries.getEntry(5), entry_v5); + ASSERT_FALSE(entries.getEntry(2).has_value()); + INSERT_ENTRY(2); + ASSERT_SAME_ENTRY(*entries.getEntry(2), entry_v2); +} + TEST_F(VersionedEntriesTest, GC) try { @@ -1554,6 +1589,117 @@ try } CATCH +TEST_F(PageDirectoryGCTest, GCOnRefedEntries2) +try +{ + // 10->entry1, 11->10=>11->entry1; del 10->entry1 + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(10, entry1); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(11, 10); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(12, 10); + edit.del(10); + dir->apply(std::move(edit)); + } + // entry1 should not be removed + { + auto outdated_entries = dir->gcInMemEntries(); + EXPECT_TRUE(outdated_entries.empty()); + } + + // del 11->entry1 + { + PageEntriesEdit edit; + edit.del(11); + edit.del(12); + dir->apply(std::move(edit)); + } + // entry1 get removed + { + auto outdated_entries = dir->gcInMemEntries(); + EXPECT_EQ(1, outdated_entries.size()); + EXPECT_SAME_ENTRY(entry1, *outdated_entries.begin()); + } +} +CATCH + +TEST_F(PageDirectoryGCTest, UpsertOnRefedEntries) +try +{ + // 10->entry1, 11->10, 12->10 + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(10, entry1); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(11, 10); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(12, 10); + edit.del(10); + dir->apply(std::move(edit)); + } + // entry1 should not be removed + { + auto outdated_entries = dir->gcInMemEntries(); + EXPECT_TRUE(outdated_entries.empty()); + } + + // upsert 10->entry2 + PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + auto full_gc_entries = dir->getEntriesByBlobIds({1}); + auto ids = full_gc_entries.first.at(1); + ASSERT_EQ(ids.size(), 1); + edit.upsertPage(std::get<0>(ids[0]), std::get<1>(ids[0]), entry2); + dir->gcApply(std::move(edit)); + } + + auto removed_entries = dir->gcInMemEntries(); + ASSERT_EQ(removed_entries.size(), 1); + EXPECT_SAME_ENTRY(removed_entries[0], entry1); + + { + auto snap = dir->createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 11, snap); + EXPECT_ENTRY_EQ(entry2, dir, 12, snap); + } + + // del 11->entry2 + { + PageEntriesEdit edit; + edit.del(11); + dir->apply(std::move(edit)); + EXPECT_EQ(dir->gcInMemEntries().size(), 0); + } + // del 12->entry2 + { + PageEntriesEdit edit; + edit.del(12); + dir->apply(std::move(edit)); + // entry2 get removed + auto outdated_entries = dir->gcInMemEntries(); + EXPECT_EQ(1, outdated_entries.size()); + EXPECT_SAME_ENTRY(entry2, *outdated_entries.begin()); + } +} +CATCH + TEST_F(PageDirectoryGCTest, GCOnRefedExternalEntries) try { diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index cac4a2c89a6..d2b85b29468 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -844,6 +844,47 @@ try } CATCH +TEST_F(PageStorageTest, GcReuseSpaceThenRestore) +try +{ + DB::UInt64 tag = 0; + const size_t buf_sz = 1024; + char c_buff[buf_sz]; + for (size_t i = 0; i < buf_sz; ++i) + { + c_buff[i] = i % 0xff; + } + + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, tag, buff, buf_sz); + page_storage->write(std::move(batch)); + } + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, tag, buff, buf_sz); + page_storage->write(std::move(batch)); + } + + { + SCOPED_TRACE("fist gc"); + page_storage->gc(); + } + + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, tag, buff, buf_sz); + page_storage->write(std::move(batch)); + } + + page_storage.reset(); + page_storage = reopenWithConfig(config); +} +CATCH + } // namespace PS::V3::tests } // namespace DB diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp index 21d03a7707c..d5ebdab0a11 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp @@ -100,17 +100,17 @@ TEST(WALSeriTest, Upserts) auto deseri_edit = DB::PS::V3::ser::deserializeFrom(DB::PS::V3::ser::serializeTo(edit)); ASSERT_EQ(deseri_edit.size(), 3); auto iter = deseri_edit.getRecords().begin(); - EXPECT_EQ(iter->type, EditRecordType::PUT); // deser as put + EXPECT_EQ(iter->type, EditRecordType::UPSERT); EXPECT_EQ(iter->page_id.low, 1); EXPECT_EQ(iter->version, ver20_1); EXPECT_SAME_ENTRY(iter->entry, entry_p1_2); iter++; - EXPECT_EQ(iter->type, EditRecordType::PUT); // deser as put + EXPECT_EQ(iter->type, EditRecordType::UPSERT); EXPECT_EQ(iter->page_id.low, 3); EXPECT_EQ(iter->version, ver21_1); EXPECT_SAME_ENTRY(iter->entry, entry_p3_2); iter++; - EXPECT_EQ(iter->type, EditRecordType::PUT); // deser as put + EXPECT_EQ(iter->type, EditRecordType::UPSERT); EXPECT_EQ(iter->page_id.low, 5); EXPECT_EQ(iter->version, ver21_1); EXPECT_SAME_ENTRY(iter->entry, entry_p5_2); @@ -212,6 +212,56 @@ class WALStoreTest : public DB::base::TiFlashStorageTestBasic PSDiskDelegatorPtr delegator; }; +TEST_F(WALStoreTest, FindCheckpointFile) +{ + Poco::Logger * log = &Poco::Logger::get("WALStoreTest"); + auto path = getTemporaryPath(); + + { + // no checkpoint + LogFilenameSet files{ + LogFilename::parseFrom(path, "log_1_0", log), + LogFilename::parseFrom(path, "log_2_0", log), + LogFilename::parseFrom(path, "log_3_0", log), + LogFilename::parseFrom(path, "log_4_0", log), + }; + auto [cp, files_to_read] = WALStoreReader::findCheckpoint(std::move(files)); + ASSERT_FALSE(cp.has_value()); + EXPECT_EQ(files_to_read.size(), 4); + } + + { + // checkpoint and some other logfiles + LogFilenameSet files{ + LogFilename::parseFrom(path, "log_12_1", log), + LogFilename::parseFrom(path, "log_13_0", log), + LogFilename::parseFrom(path, "log_14_0", log), + }; + auto [cp, files_to_read] = WALStoreReader::findCheckpoint(std::move(files)); + ASSERT_TRUE(cp.has_value()); + EXPECT_EQ(cp->log_num, 12); + EXPECT_EQ(cp->level_num, 1); + EXPECT_EQ(files_to_read.size(), 2); + } + + { + // some files before checkpoint left on disk + LogFilenameSet files{ + LogFilename::parseFrom(path, "log_10_0", log), + LogFilename::parseFrom(path, "log_11_0", log), + LogFilename::parseFrom(path, "log_12_0", log), + LogFilename::parseFrom(path, "log_12_1", log), + LogFilename::parseFrom(path, "log_13_0", log), + LogFilename::parseFrom(path, "log_14_0", log), + }; + auto [cp, files_to_read] = WALStoreReader::findCheckpoint(std::move(files)); + ASSERT_TRUE(cp.has_value()); + EXPECT_EQ(cp->log_num, 12); + EXPECT_EQ(cp->level_num, 1); + EXPECT_EQ(files_to_read.size(), 2); + } +} + TEST_F(WALStoreTest, Empty) { auto ctx = DB::tests::TiFlashTestEnv::getContext(); @@ -223,6 +273,7 @@ TEST_F(WALStoreTest, Empty) while (reader->remained()) { auto [ok, edit] = reader->next(); + (void)edit; if (!ok) { reader->throwIfError();