diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index e5427d6e562..f094179695d 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -12,11 +12,28 @@ PageDirectoryPtr PageDirectoryFactory::create(FileProviderPtr & file_provider, P auto [wal, reader] = WALStore::create(file_provider, delegator); PageDirectoryPtr dir = std::make_unique(std::move(wal)); loadFromDisk(dir, std::move(reader)); - // TODO: After restored ends, set the last offset of log file for `wal` - if (blob_stats) - blob_stats->restore(); // Reset the `sequence` to the maximum of persisted. dir->sequence = max_applied_ver.sequence; + + if (blob_stats) + { + // After all entries restored to `mvcc_table_directory`, only apply + // the latest entry to `blob_stats`, or we may meet error since + // some entries may be removed in memory but not get compacted + // in the log file. + for (const auto & [page_id, entries] : dir->mvcc_table_directory) + { + (void)page_id; + if (auto entry = entries->getEntry(max_applied_ver.sequence); entry) + { + blob_stats->restoreByEntry(*entry); + } + } + + blob_stats->restore(); + } + + // TODO: After restored ends, set the last offset of log file for `wal` return dir; } @@ -67,8 +84,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr } case EditRecordType::VAR_ENTRY: version_list->fromRestored(r); - if (blob_stats) - blob_stats->restoreByEntry(r.entry); break; case EditRecordType::PUT_EXTERNAL: { @@ -82,8 +97,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr } case EditRecordType::PUT: version_list->createNewEntry(restored_version, r.entry); - if (blob_stats) - blob_stats->restoreByEntry(r.entry); break; case EditRecordType::DEL: case EditRecordType::VAR_DELETE: // nothing different from `DEL` @@ -94,8 +107,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr break; case EditRecordType::UPSERT: version_list->createNewEntry(restored_version, r.entry); - if (blob_stats) - blob_stats->restoreByEntry(r.entry); break; } } diff --git a/dbms/src/Storages/Page/V3/PageEntriesEdit.h b/dbms/src/Storages/Page/V3/PageEntriesEdit.h index b75910edc46..f5fe35f60c7 100644 --- a/dbms/src/Storages/Page/V3/PageEntriesEdit.h +++ b/dbms/src/Storages/Page/V3/PageEntriesEdit.h @@ -68,6 +68,31 @@ enum class EditRecordType VAR_DELETE, }; +inline const char * typeToString(EditRecordType t) +{ + switch (t) + { + case EditRecordType::PUT: + return "PUT "; + case EditRecordType::PUT_EXTERNAL: + return "EXT "; + case EditRecordType::REF: + return "REF "; + case EditRecordType::DEL: + return "DEL "; + case EditRecordType::UPSERT: + return "UPSERT "; + case EditRecordType::VAR_ENTRY: + return "VAR_ENT"; + case EditRecordType::VAR_REF: + return "VAR_REF"; + case EditRecordType::VAR_EXTERNAL: + return "VAR_EXT"; + case EditRecordType::VAR_DELETE: + return "VAR_DEL"; + } +} + /// Page entries change to apply to PageDirectory class PageEntriesEdit { @@ -176,10 +201,28 @@ class PageEntriesEdit PageIdV3Internal ori_page_id; PageVersionType version; PageEntryV3 entry; - Int64 being_ref_count = 1; + Int64 being_ref_count; + + EditRecord() + : page_id(0) + , ori_page_id(0) + , being_ref_count(1) + {} }; using EditRecords = std::vector; + static String toDebugString(const EditRecord & rec) + { + return fmt::format( + "{{type:{}, page_id:{}, ori_id:{}, version:{}, entry:{}, being_ref_count:{}}}", + typeToString(rec.type), + rec.page_id, + rec.ori_page_id, + rec.version, + DB::PS::V3::toDebugString(rec.entry), + rec.being_ref_count); + } + void appendRecord(const EditRecord & rec) { records.emplace_back(rec);