diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 86b9e4fad0f..4fd98fd205f 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -198,8 +198,8 @@ namespace DB F(type_clean_external, {{"type", "clean_external"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_v3, {{"type", "v3"}}, ExpBuckets{0.0005, 2, 20})) \ M(tiflash_storage_page_command_count, "Total number of PageStorage's command, such as write / read / scan / snapshot", Counter, \ - F(type_write, {"type", "write"}), F(type_read, {"type", "read"}), F(type_read_page_dir, {"type", "read_page_dir"}), \ - F(type_read_blob, {"type", "read_blob"}), F(type_scan, {"type", "scan"}), F(type_snapshot, {"type", "snapshot"})) \ + F(type_write, {"type", "write"}), F(type_read, {"type", "read"}), \ + F(type_scan, {"type", "scan"}), F(type_snapshot, {"type", "snapshot"})) \ M(tiflash_storage_page_write_batch_size, "The size of each write batch in bytes", Histogram, \ F(type_v3, {{"type", "v3"}}, ExpBuckets{4 * 1024, 4, 10})) \ M(tiflash_storage_page_write_duration_seconds, "The duration of each write batch", Histogram, \ diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 0c38034f991..20a6afb51f4 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -899,6 +899,7 @@ template typename BlobStore::PageMap BlobStore::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_limiter) { + GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); if (entries.empty()) { return {}; @@ -1001,6 +1002,7 @@ BlobStore::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_l template Page BlobStore::read(const PageIdAndEntry & id_entry, const ReadLimiterPtr & read_limiter) { + GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); const auto & [page_id_v3, entry] = id_entry; const size_t buf_size = entry.size; @@ -1060,7 +1062,7 @@ Page BlobStore::read(const PageIdAndEntry & id_entry, const ReadLimiterPt template BlobFilePtr BlobStore::read(const typename BlobStore::PageId & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background) { - GET_METRIC(tiflash_storage_page_command_count, type_read_blob).Increment(); + GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); assert(buffers != nullptr); BlobFilePtr blob_file = getBlobFile(blob_id); try diff --git a/dbms/src/Storages/Page/V3/GCDefines.cpp b/dbms/src/Storages/Page/V3/GCDefines.cpp index 3ebbfa09dcb..f72979860af 100644 --- a/dbms/src/Storages/Page/V3/GCDefines.cpp +++ b/dbms/src/Storages/Page/V3/GCDefines.cpp @@ -237,6 +237,14 @@ GCTimeStatistics ExternalPageCallbacksManager::doGC( // 1. Do the MVCC gc, clean up expired snapshot. // And get the expired entries. + statistics.compact_wal_happen = page_directory.tryDumpSnapshot(read_limiter, write_limiter, force_wal_compact); + if (statistics.compact_wal_happen) + { + GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment(); + } + statistics.compact_wal_ms = gc_watch.elapsedMillisecondsFromLastTime(); + GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_compact_wal).Observe(statistics.compact_wal_ms / 1000.0); + typename Trait::PageDirectory::InMemGCOption options; if constexpr (std::is_same_v) { @@ -247,15 +255,6 @@ GCTimeStatistics ExternalPageCallbacksManager::doGC( statistics.compact_directory_ms = gc_watch.elapsedMillisecondsFromLastTime(); GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_compact_directory).Observe(statistics.compact_directory_ms / 1000.0); - // Compact WAL after in-memory GC in PageDirectory in order to reduce the overhead of dumping useless entries - statistics.compact_wal_happen = page_directory.tryDumpSnapshot(write_limiter, force_wal_compact); - if (statistics.compact_wal_happen) - { - GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment(); - } - statistics.compact_wal_ms = gc_watch.elapsedMillisecondsFromLastTime(); - GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_compact_wal).Observe(statistics.compact_wal_ms / 1000.0); - SYNC_FOR("before_PageStorageImpl::doGC_fullGC_prepare"); // 2. Remove the expired entries in BlobStore. diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 911e39f028a..3e7460a7067 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -103,7 +103,7 @@ void VersionedPageEntries::createNewEntry(const PageVersion & ver, const assert(last_iter->second.isEntry()); // It is ok to replace the entry with same sequence and newer epoch, but not valid // to replace the entry with newer sequence. - if (unlikely(last_iter->second.being_ref_count.getLatestRefCount() != 1 && last_iter->first.sequence < ver.sequence)) + if (unlikely(last_iter->second.being_ref_count != 1 && last_iter->first.sequence < ver.sequence)) { throw Exception( fmt::format("Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]", @@ -151,7 +151,7 @@ typename VersionedPageEntries::PageId VersionedPageEntries::create assert(last_iter->second.isEntry()); // It is ok to replace the entry with same sequence and newer epoch, but not valid // to replace the entry with newer sequence. - if (unlikely(last_iter->second.being_ref_count.getLatestRefCount() != 1 && last_iter->first.sequence < ver.sequence)) + if (unlikely(last_iter->second.being_ref_count != 1 && last_iter->first.sequence < ver.sequence)) { throw Exception( fmt::format("Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]", @@ -218,6 +218,7 @@ std::shared_ptr::PageId> VersionedPageEntri is_deleted = false; create_ver = ver; delete_ver = PageVersion(0); + being_ref_count = 1; RUNTIME_CHECK(entries.empty()); entries.emplace(create_ver, EntryOrDelete::newNormalEntry(entry)); // return the new created holder to caller to set the page_id @@ -235,6 +236,7 @@ std::shared_ptr::PageId> VersionedPageEntri is_deleted = false; create_ver = ver; delete_ver = PageVersion(0); + being_ref_count = 1; entries.emplace(create_ver, EntryOrDelete::newNormalEntry(entry)); // return the new created holder to caller to set the page_id external_holder = std::make_shared(); @@ -400,15 +402,15 @@ std::shared_ptr::PageId> VersionedPageEntri type = EditRecordType::VAR_EXTERNAL; is_deleted = false; create_ver = rec.version; - being_ref_count.restoreFrom(rec.version, rec.being_ref_count); - entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.version, 1 /* meaningless */)); + being_ref_count = rec.being_ref_count; + entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.being_ref_count)); external_holder = std::make_shared(rec.page_id); return external_holder; } case EditRecordType::VAR_ENTRY: { type = EditRecordType::VAR_ENTRY; - entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.version, rec.being_ref_count)); + entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.being_ref_count)); return nullptr; } default: @@ -612,12 +614,12 @@ bool VersionedPageEntries::isVisible(UInt64 seq) const } template -Int64 VersionedPageEntries::incrRefCount(const PageVersion & target_ver, const PageVersion & ref_ver) +Int64 VersionedPageEntries::incrRefCount(const PageVersion & ver) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_ENTRY) { - if (auto iter = MapUtils::findMutLess(entries, PageVersion(target_ver.sequence + 1)); + if (auto iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1)); iter != entries.end()) { // ignore all "delete" @@ -630,27 +632,23 @@ Int64 VersionedPageEntries::incrRefCount(const PageVersion & target_ver, // Then `iter` point to an entry or the `entries.begin()`, return if entry found if (iter->second.isEntry()) { - auto ref_count_value = iter->second.being_ref_count.getLatestRefCount(); - if (unlikely(met_delete && ref_count_value == 1)) + if (unlikely(met_delete && iter->second.being_ref_count == 1)) { - throw Exception(fmt::format("Try to add ref to a completely deleted entry [entry={}] [ver={}]", iter->second, target_ver), ErrorCodes::LOGICAL_ERROR); + throw Exception(fmt::format("Try to add ref to a completely deleted entry [entry={}] [ver={}]", iter->second, ver), ErrorCodes::LOGICAL_ERROR); } - iter->second.being_ref_count.incrRefCount(ref_ver, 1); - return ref_count_value + 1; + return ++iter->second.being_ref_count; } } // fallthrough to FAIL } else if (type == EditRecordType::VAR_EXTERNAL) { - if (create_ver <= target_ver) + if (create_ver <= ver) { // We may add reference to an external id even if it is logically deleted. - auto ref_count_value = being_ref_count.getLatestRefCount(); - being_ref_count.incrRefCount(ref_ver, 1); - return ref_count_value + 1; + return ++being_ref_count; } } - throw Exception(fmt::format("The entry to be added ref count is not found [ver={}] [state={}]", target_ver, toDebugString()), ErrorCodes::LOGICAL_ERROR); + throw Exception(fmt::format("The entry to be added ref count is not found [ver={}] [state={}]", ver, toDebugString()), ErrorCodes::LOGICAL_ERROR); } template @@ -711,7 +709,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( { if (type == EditRecordType::VAR_EXTERNAL) { - return (being_ref_count.getLatestRefCount() == 1 && is_deleted && delete_ver.sequence <= lowest_seq); + return (being_ref_count == 1 && is_deleted && delete_ver.sequence <= lowest_seq); } else if (type == EditRecordType::VAR_REF) { @@ -789,7 +787,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( { if (last_entry_is_delete) { - if (iter->second.being_ref_count.getLatestRefCount() == 1) + if (iter->second.being_ref_count == 1) { if (entries_removed) { @@ -831,8 +829,12 @@ bool VersionedPageEntries::derefAndClean( auto page_lock = acquireLock(); if (type == EditRecordType::VAR_EXTERNAL) { - being_ref_count.decrRefCountInSnap(lowest_seq, deref_count); - return (is_deleted && delete_ver.sequence <= lowest_seq && being_ref_count.getLatestRefCount() == 1); + if (being_ref_count <= deref_count) + { + throw Exception(fmt::format("Decreasing ref count error [page_id={}] [ver={}] [deref_count={}]", page_id, deref_ver, deref_count)); + } + being_ref_count -= deref_count; + return (is_deleted && delete_ver.sequence <= lowest_seq && being_ref_count == 1); } else if (type == EditRecordType::VAR_ENTRY) { @@ -855,7 +857,11 @@ bool VersionedPageEntries::derefAndClean( throw Exception(fmt::format("Can not find entry for decreasing ref count till the begin [page_id={}] [ver={}] [deref_count={}]", page_id, deref_ver, deref_count)); } assert(iter->second.isEntry()); - iter->second.being_ref_count.decrRefCountInSnap(lowest_seq, deref_count); + if (iter->second.being_ref_count <= deref_count) + { + throw Exception(fmt::format("Decreasing ref count error [page_id={}] [ver={}] [deref_count={}] [entry={}]", page_id, deref_ver, deref_count, iter->second)); + } + iter->second.being_ref_count -= deref_count; if (lowest_seq == 0) return false; @@ -892,7 +898,7 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageId & pa return; auto iter = entries.find(create_ver); RUNTIME_CHECK(iter != entries.end()); - edit.varExternal(page_id, create_ver, iter->second.entry, being_ref_count.getRefCountInSnap(seq)); + edit.varExternal(page_id, create_ver, iter->second.entry, being_ref_count); if (is_deleted && delete_ver.sequence <= seq) { edit.varDel(page_id, delete_ver); @@ -910,7 +916,7 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageId & pa if (last_iter->second.isEntry()) { const auto & entry = last_iter->second; - edit.varEntry(page_id, /*ver*/ last_iter->first, entry.entry, entry.being_ref_count.getRefCountInSnap(seq)); + edit.varEntry(page_id, /*ver*/ last_iter->first, entry.entry, entry.being_ref_count); return; } else if (last_iter->second.isDelete()) @@ -924,12 +930,11 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageId & pa auto prev_iter = --last_iter; // Note that `last_iter` should not be used anymore if (prev_iter->second.isEntry()) { - auto ref_count_value = prev_iter->second.being_ref_count.getRefCountInSnap(seq); - if (ref_count_value == 1) + if (prev_iter->second.being_ref_count == 1) return; // It is being ref by another id, should persist the item and delete const auto & entry = prev_iter->second; - edit.varEntry(page_id, prev_iter->first, entry.entry, ref_count_value); + edit.varEntry(page_id, prev_iter->first, entry.entry, entry.being_ref_count); edit.varDel(page_id, last_version); } } @@ -1016,7 +1021,7 @@ SnapshotsStatistics PageDirectory::getSnapshotsStat() const template typename PageDirectory::PageIdAndEntry PageDirectory::getByIDImpl(const PageId & page_id, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const { - GET_METRIC(tiflash_storage_page_command_count, type_read_page_dir).Increment(); + GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); PageEntryV3 entry_got; // After two write batches applied: [ver=1]{put 10}, [ver=2]{ref 11->10, del 10}, the `mvcc_table_directory` is: @@ -1111,7 +1116,7 @@ template std::pair::PageIdAndEntries, typename PageDirectory::PageIds> PageDirectory::getByIDsImpl(const typename PageDirectory::PageIds & page_ids, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const { - GET_METRIC(tiflash_storage_page_command_count, type_read_page_dir).Increment(); + GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); PageEntryV3 entry_got; PageIds page_not_found = {}; @@ -1437,7 +1442,7 @@ void PageDirectory::applyRefEditRecord( // Add the ref-count of being-ref entry if (auto resolved_iter = mvcc_table_directory.find(resolved_id); resolved_iter != mvcc_table_directory.end()) { - resolved_iter->second->incrRefCount(resolved_ver, version); + resolved_iter->second->incrRefCount(resolved_ver); } else { @@ -1829,8 +1834,7 @@ PageDirectory::getEntriesByBlobIdsForDifferentPageTypes(const typename Pa return page_type_and_gc_info; } -template -bool PageDirectory::tryDumpSnapshot(const WriteLimiterPtr & write_limiter, bool force) +bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter, bool force) { // Only apply compact logs when files snapshot is valid auto files_snap = wal->tryGetFilesSnapshot(max_persisted_log_files, force); @@ -1846,19 +1850,43 @@ bool PageDirectory::tryDumpSnapshot(const WriteLimiterPtr & write_limiter auto identifier = fmt::format("{}.dump_{}", wal->name(), log_num); Stopwatch watch; + auto snapshot_reader = wal->createReaderForFiles(identifier, files_snap.persisted_log_files, read_limiter); + // we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that + // persist new logs into disk. So we pass `nullptr` as `wal` to the factory. + auto collapsed_dir = [&]() { + // we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that + // persist new logs into disk. So we pass `nullptr` as `wal` to the factory. + static_assert(std::is_same_v || std::is_same_v, + "unknown impl"); + if constexpr (std::is_same_v) + { + u128::PageDirectoryFactory factory; + return factory.createFromReader( + identifier, + std::move(snapshot_reader), + /* wal */ nullptr); + } + else if constexpr (std::is_same_v) + { + universal::PageDirectoryFactory factory; + return factory.createFromReader( + identifier, + std::move(snapshot_reader), + /* wal */ nullptr); + } + }(); // The records persisted in `files_snap` is older than or equal to all records in `edit` - auto snap = createSnapshot(identifier); - auto edit = dumpSnapshotToEdit(snap); - files_snap.num_records = edit.size(); - files_snap.dump_elapsed_ms = watch.elapsedMilliseconds(); + auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit(); + files_snap.num_records = edit_from_disk.size(); + files_snap.read_elapsed_ms = watch.elapsedMilliseconds(); if constexpr (std::is_same_v) { - bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeTo(edit), write_limiter); + bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeTo(edit_from_disk), write_limiter); return done_any_io; } else if constexpr (std::is_same_v) { - bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeInCompressedFormTo(edit), write_limiter); + bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeInCompressedFormTo(edit_from_disk), write_limiter); return done_any_io; } } @@ -1939,8 +1967,6 @@ typename PageDirectory::PageEntries PageDirectory::gcInMemEntries( } } - SYNC_FOR("after_PageDirectory::doGC_getLowestSeq"); - PageEntriesV3 all_del_entries; typename MVCCMapType::iterator iter; { diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 152af1c9bc9..7626ec2a9e6 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -82,139 +82,25 @@ class PageDirectorySnapshot : public DB::PageStorageSnapshot }; using PageDirectorySnapshotPtr = std::shared_ptr; -// This is not a thread safe class -// It must be used with outer synchronization -class MultiVersionRefCount -{ -public: - MultiVersionRefCount() = default; - - MultiVersionRefCount(const MultiVersionRefCount & other) - { - if (other.versioned_ref_counts) - { - versioned_ref_counts = std::make_unique>>(); - for (const auto & [ver, ref_count] : *(other.versioned_ref_counts)) - { - versioned_ref_counts->emplace_back(ver, ref_count); - } - } - else - { - versioned_ref_counts = nullptr; - } - } - - void restoreFrom(const PageVersion & ver, Int64 ref_count) - { - // versioned_ref_counts being nullptr always means ref count 1 - RUNTIME_CHECK(ref_count > 0); - RUNTIME_CHECK(!versioned_ref_counts); - if (ref_count == 1) - { - return; - } - versioned_ref_counts = std::make_unique>>(); - // empty `versioned_ref_counts` means ref count 1, so the ref count delta here is ref_count - 1 - versioned_ref_counts->emplace_back(ver, ref_count - 1); - } - - void incrRefCount(const PageVersion & ver, Int64 ref_count_delta) - { - if (!versioned_ref_counts) - { - versioned_ref_counts = std::make_unique>>(); - } - versioned_ref_counts->emplace_back(ver, ref_count_delta); - } - - void decrRefCountInSnap(UInt64 snap_seq, Int64 deref_count_delta) - { - if (snap_seq == 0) - { - // When `snap_seq` is 0, it means the deref operation is caused by rewritting a ref page to a normal page. - // Actually we can collapse versioned_ref_counts here too because there should be no other gc happend concurrently. - // But collpase it when `snap_seq` is not zero should be enough. So we just do an append here. - versioned_ref_counts->emplace_back(PageVersion(0, 0), -deref_count_delta); - } - else - { - auto new_versioned_ref_counts = std::make_unique>>(); - Int64 ref_count_delta_in_snap = -deref_count_delta; - for (const auto & [ver, ref_count_delta] : *versioned_ref_counts) - { - if (ver.sequence <= snap_seq) - { - ref_count_delta_in_snap += ref_count_delta; - } - else - { - new_versioned_ref_counts->emplace_back(ver, ref_count_delta); - } - } - if (ref_count_delta_in_snap == 0 && new_versioned_ref_counts->empty()) - { - versioned_ref_counts = nullptr; - return; - } - RUNTIME_CHECK(ref_count_delta_in_snap > 0, deref_count_delta, ref_count_delta_in_snap); - new_versioned_ref_counts->emplace_back(PageVersion(snap_seq, 0), ref_count_delta_in_snap); - versioned_ref_counts.swap(new_versioned_ref_counts); - } - } - - Int64 getRefCountInSnap(UInt64 snap_seq) const - { - if (!versioned_ref_counts) - { - return 1; - } - return std::accumulate( - versioned_ref_counts->begin(), - versioned_ref_counts->end(), - 1, - [&](Int64 acc, const auto & ver_ref_count) { - return acc + (ver_ref_count.first.sequence <= snap_seq ? ver_ref_count.second : 0); - }); - } - - Int64 getLatestRefCount() const - { - if (!versioned_ref_counts) - { - return 1; - } - return std::accumulate( - versioned_ref_counts->begin(), - versioned_ref_counts->end(), - 1, - [](Int64 acc, const auto & ver_ref_count) { return acc + ver_ref_count.second; }); - } - -#ifndef DBMS_PUBLIC_GTEST -private: -#endif - // store the ref count delta for each version, empty means ref count 1 - std::unique_ptr>> versioned_ref_counts; -}; - struct EntryOrDelete { bool is_delete = true; - MultiVersionRefCount being_ref_count; + Int64 being_ref_count = 1; PageEntryV3 entry; static EntryOrDelete newDelete() { return EntryOrDelete{ .is_delete = true, + .being_ref_count = 1, // meaningless .entry = {}, // meaningless }; - }; + } static EntryOrDelete newNormalEntry(const PageEntryV3 & entry) { return EntryOrDelete{ .is_delete = false, + .being_ref_count = 1, .entry = entry, }; } @@ -227,24 +113,17 @@ struct EntryOrDelete }; } - static EntryOrDelete newFromRestored(PageEntryV3 entry, const PageVersion & ver, Int64 being_ref_count) + static EntryOrDelete newFromRestored(PageEntryV3 entry, Int64 being_ref_count) { - auto result = EntryOrDelete{ + return EntryOrDelete{ .is_delete = false, + .being_ref_count = being_ref_count, .entry = entry, }; - result.being_ref_count.restoreFrom(ver, being_ref_count); - return result; } - bool isDelete() const - { - return is_delete; - } - bool isEntry() const - { - return !is_delete; - } + bool isDelete() const { return is_delete; } + bool isEntry() const { return !is_delete; } }; using PageLock = std::lock_guard; @@ -273,6 +152,7 @@ class VersionedPageEntries , create_ver(0) , delete_ver(0) , ori_page_id{} + , being_ref_count(1) {} bool isExternalPage() const { return type == EditRecordType::VAR_EXTERNAL; } @@ -305,7 +185,7 @@ class VersionedPageEntries std::tuple resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV3 * entry); - Int64 incrRefCount(const PageVersion & target_ver, const PageVersion & ref_ver); + Int64 incrRefCount(const PageVersion & ver); std::optional getEntry(UInt64 seq) const; @@ -377,7 +257,7 @@ class VersionedPageEntries is_deleted, delete_ver, ori_page_id, - being_ref_count.getLatestRefCount(), + being_ref_count, entries.size()); } template @@ -404,7 +284,7 @@ class VersionedPageEntries // Original page id, valid when type == VAR_REF PageId ori_page_id; // Being ref counter, valid when type == VAR_EXTERNAL - MultiVersionRefCount being_ref_count; + Int64 being_ref_count; // A shared ptr to a holder, valid when type == VAR_EXTERNAL std::shared_ptr external_holder; }; @@ -487,7 +367,10 @@ class PageDirectory void gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter = nullptr); - bool tryDumpSnapshot(const WriteLimiterPtr & write_limiter = nullptr, bool force = false); + /// When create PageDirectory for dump snapshot, we should keep the last valid var_entry when it is deleted. + /// Because there may be some upsert entry in later wal files, and we should keep the valid var_entry and the delete entry to delete the later upsert entry. + /// And we don't restore the entries in blob store, because this PageDirectory is just read only for its entries. + bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr, bool force = false); size_t copyCheckpointInfoFromEdit(const PageEntriesEdit & edit); @@ -672,6 +555,6 @@ struct fmt::formatter "{{is_delete:{}, entry:{}, being_ref_count:{}}}", entry.is_delete, entry.entry, - entry.being_ref_count.getLatestRefCount()); + entry.being_ref_count); } }; diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index 35079cd7531..5c71c4eeea8 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -176,7 +176,6 @@ DB::PageEntry PageStorageImpl::getEntryImpl(NamespaceID ns_id, PageIdU64 page_id DB::Page PageStorageImpl::readImpl(NamespaceID ns_id, PageIdU64 page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) { - GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); if (!snapshot) { snapshot = this->getSnapshot(""); @@ -188,7 +187,6 @@ DB::Page PageStorageImpl::readImpl(NamespaceID ns_id, PageIdU64 page_id, const R PageMapU64 PageStorageImpl::readImpl(NamespaceID ns_id, const PageIdU64s & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) { - GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); if (!snapshot) { snapshot = this->getSnapshot(""); @@ -219,7 +217,6 @@ PageMapU64 PageStorageImpl::readImpl(NamespaceID ns_id, const PageIdU64s & page_ PageMapU64 PageStorageImpl::readImpl(NamespaceID ns_id, const std::vector & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) { - GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); if (!snapshot) { snapshot = this->getSnapshot(""); diff --git a/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp b/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp index fe03d8cd0c8..f42971f02c4 100644 --- a/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp +++ b/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp @@ -113,7 +113,6 @@ void UniversalPageStorage::write(UniversalWriteBatch && write_batch, PageType pa Page UniversalPageStorage::read(const UniversalPageId & page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) const { - GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); if (!snapshot) { snapshot = this->getSnapshot(""); @@ -138,7 +137,6 @@ Page UniversalPageStorage::read(const UniversalPageId & page_id, const ReadLimit UniversalPageMap UniversalPageStorage::read(const UniversalPageIds & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) const { - GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); if (!snapshot) { snapshot = this->getSnapshot(""); @@ -190,7 +188,6 @@ UniversalPageMap UniversalPageStorage::read(const UniversalPageIds & page_ids, c UniversalPageMap UniversalPageStorage::read(const std::vector & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) const { - GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); if (!snapshot) { snapshot = this->getSnapshot(""); diff --git a/dbms/src/Storages/Page/V3/WALStore.cpp b/dbms/src/Storages/Page/V3/WALStore.cpp index bc002b6c47f..707e2da96fa 100644 --- a/dbms/src/Storages/Page/V3/WALStore.cpp +++ b/dbms/src/Storages/Page/V3/WALStore.cpp @@ -258,8 +258,8 @@ bool WALStore::saveSnapshot( fb.append(arg.filename(arg.stage)); }, ", "); - fmt_buf.fmtAppend("] [dump_cost={}] [num_records={}] [file={}] [size={}].", - files_snap.dump_elapsed_ms, + fmt_buf.fmtAppend("] [read_cost={}] [num_records={}] [file={}] [size={}].", + files_snap.read_elapsed_ms, files_snap.num_records, normal_fullname, serialized_snap.size()); diff --git a/dbms/src/Storages/Page/V3/WALStore.h b/dbms/src/Storages/Page/V3/WALStore.h index 3dae34a5e27..fe862cbbacf 100644 --- a/dbms/src/Storages/Page/V3/WALStore.h +++ b/dbms/src/Storages/Page/V3/WALStore.h @@ -81,7 +81,7 @@ class WALStore // Some stats for logging UInt64 num_records = 0; - UInt64 dump_elapsed_ms = 0; + UInt64 read_elapsed_ms = 0; // Note that persisted_log_files should not be empty for needSave() == true, // cause we get the largest log num from persisted_log_files as the new diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 4db0540e2b3..59430df244b 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -1163,163 +1163,6 @@ TEST_F(PageDirectoryGCTest, ManyEditsAndDumpSnapshot) } } -TEST_F(PageDirectoryTest, RestoreWithRefToDeletedPage) -try -{ - { - PageEntryV3 entry_1_v1{.file_id = 1, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntriesEdit edit; // ingest - edit.put(buildV3Id(TEST_NAMESPACE_ID, 352), entry_1_v1); - dir->apply(std::move(edit)); - } - { - PageEntriesEdit edit; - edit.ref(buildV3Id(TEST_NAMESPACE_ID, 353), buildV3Id(TEST_NAMESPACE_ID, 352)); - dir->apply(std::move(edit)); - } - { - PageEntriesEdit edit; // ingest done - edit.del(buildV3Id(TEST_NAMESPACE_ID, 352)); - dir->apply(std::move(edit)); - } - - { - auto snap = dir->createSnapshot(); - auto normal_id = getNormalPageIdU64(dir, 353, snap); - EXPECT_EQ(normal_id, 352); - } - - auto s0 = dir->createSnapshot(); - auto edit = dir->dumpSnapshotToEdit(s0); - auto restore_from_edit = [](const PageEntriesEdit & edit) { - auto deseri_edit = u128::Serializer::deserializeFrom(u128::Serializer::serializeTo(edit), nullptr); - auto provider = DB::tests::TiFlashTestEnv::getDefaultFileProvider(); - auto path = getTemporaryPath(); - PSDiskDelegatorPtr delegator = std::make_shared(path); - PageDirectoryFactory factory; - auto d = factory.createFromEditForTest(getCurrentTestName(), provider, delegator, deseri_edit); - return d; - }; - - { - auto restored_dir = restore_from_edit(edit); - auto snap = restored_dir->createSnapshot(); - auto normal_id = getNormalPageIdU64(restored_dir, 353, snap); - EXPECT_EQ(normal_id, 352); - } -} -CATCH - -TEST_F(PageDirectoryTest, IncrRefDuringDump) -try -{ - PageEntryV3 entry_1_v1{.file_id = 50, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - { - PageEntriesEdit edit; - edit.put(buildV3Id(TEST_NAMESPACE_ID, 1), entry_1_v1); - dir->apply(std::move(edit)); - } - { - PageEntriesEdit edit; - edit.ref(buildV3Id(TEST_NAMESPACE_ID, 2), buildV3Id(TEST_NAMESPACE_ID, 1)); - edit.ref(buildV3Id(TEST_NAMESPACE_ID, 3), buildV3Id(TEST_NAMESPACE_ID, 1)); - edit.del(buildV3Id(TEST_NAMESPACE_ID, 1)); - dir->apply(std::move(edit)); - } - { - PageEntriesEdit edit; - edit.del(buildV3Id(TEST_NAMESPACE_ID, 2)); - dir->apply(std::move(edit)); - } - - { - dir->gcInMemEntries({}); - ASSERT_EQ(dir->numPages(), 2); - } - - // create a snap for dump - auto snap = dir->createSnapshot(""); - - // add a ref during dump snapshot - { - PageEntriesEdit edit; - edit.ref(buildV3Id(TEST_NAMESPACE_ID, 5), buildV3Id(TEST_NAMESPACE_ID, 3)); - dir->apply(std::move(edit)); - } - - // check the being_ref_count in dumped snapshot is correct - { - auto edit = dir->dumpSnapshotToEdit(snap); - ASSERT_EQ(edit.size(), 3); - const auto & records = edit.getRecords(); - ASSERT_EQ(records[0].type, EditRecordType::VAR_ENTRY); - ASSERT_EQ(records[0].being_ref_count, 2); - ASSERT_EQ(records[1].type, EditRecordType::VAR_DELETE); - ASSERT_EQ(records[2].type, EditRecordType::VAR_REF); - } - - { - auto edit = dir->dumpSnapshotToEdit(); - ASSERT_EQ(edit.size(), 4); - const auto & records = edit.getRecords(); - ASSERT_EQ(records[0].type, EditRecordType::VAR_ENTRY); - ASSERT_EQ(records[0].being_ref_count, 3); - ASSERT_EQ(records[1].type, EditRecordType::VAR_DELETE); - ASSERT_EQ(records[2].type, EditRecordType::VAR_REF); - ASSERT_EQ(records[3].type, EditRecordType::VAR_REF); - } -} -CATCH - -TEST(MultiVersionRefCount, RefAndCollapse) -try -{ - MultiVersionRefCount ref_counts; - { - ref_counts.incrRefCount(PageVersion(2), 1); - ref_counts.incrRefCount(PageVersion(4), 2); - ref_counts.incrRefCount(PageVersion(8), 1); - ASSERT_EQ(ref_counts.getRefCountInSnap(1), 1); - ASSERT_EQ(ref_counts.getRefCountInSnap(2), 2); - ASSERT_EQ(ref_counts.getRefCountInSnap(8), 5); - ASSERT_EQ(ref_counts.getLatestRefCount(), 5); - } - - // decr ref and collapse - { - ASSERT_EQ(ref_counts.versioned_ref_counts->size(), 3); - ref_counts.decrRefCountInSnap(4, 2); - ASSERT_EQ(ref_counts.versioned_ref_counts->size(), 2); - ASSERT_EQ(ref_counts.getRefCountInSnap(4), 2); - ASSERT_EQ(ref_counts.getRefCountInSnap(8), 3); - ref_counts.decrRefCountInSnap(10, 2); - ASSERT_EQ(ref_counts.getLatestRefCount(), 1); - ASSERT_EQ(ref_counts.versioned_ref_counts, nullptr); - } -} -CATCH - -TEST(MultiVersionRefCount, DecrRefWithSeq0) -try -{ - MultiVersionRefCount ref_counts; - { - ref_counts.incrRefCount(PageVersion(2), 1); - ref_counts.incrRefCount(PageVersion(3), 1); - ref_counts.incrRefCount(PageVersion(4), 1); - ref_counts.incrRefCount(PageVersion(8), 1); - ref_counts.incrRefCount(PageVersion(9), 1); - ASSERT_EQ(ref_counts.versioned_ref_counts->size(), 5); - } - - { - ref_counts.decrRefCountInSnap(0, 2); - ASSERT_EQ(ref_counts.versioned_ref_counts->size(), 6); - ASSERT_EQ(ref_counts.getLatestRefCount(), 4); - } -} // namespace PS::V3::tests -CATCH - TEST_F(PageDirectoryGCTest, GCPushForward) try { @@ -2471,64 +2314,6 @@ try } CATCH -TEST_F(PageDirectoryGCTest, IncrRefDuringGC) -try -{ - PageEntryV3 entry_1_v1{.file_id = 50, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - { - PageEntriesEdit edit; - edit.put(buildV3Id(TEST_NAMESPACE_ID, 1), entry_1_v1); - dir->apply(std::move(edit)); - } - { - PageEntriesEdit edit; - edit.ref(buildV3Id(TEST_NAMESPACE_ID, 2), buildV3Id(TEST_NAMESPACE_ID, 1)); - edit.ref(buildV3Id(TEST_NAMESPACE_ID, 3), buildV3Id(TEST_NAMESPACE_ID, 1)); - edit.del(buildV3Id(TEST_NAMESPACE_ID, 1)); - dir->apply(std::move(edit)); - } - { - PageEntriesEdit edit; - edit.del(buildV3Id(TEST_NAMESPACE_ID, 2)); - dir->apply(std::move(edit)); - } - - { - // begin gc and stop after get lowest seq - auto sp_gc = SyncPointCtl::enableInScope("after_PageDirectory::doGC_getLowestSeq"); - auto th_gc = std::async([&]() { - dir->gcInMemEntries({}); - }); - sp_gc.waitAndPause(); - - // add a ref during gc - PageEntriesEdit edit; - edit.ref(buildV3Id(TEST_NAMESPACE_ID, 5), buildV3Id(TEST_NAMESPACE_ID, 3)); - dir->apply(std::move(edit)); - - // continue gc and finish - sp_gc.next(); - th_gc.get(); - - ASSERT_EQ(dir->numPages(), 3); - } - - { - auto snap = dir->createSnapshot(); - EXPECT_SAME_ENTRY(entry_1_v1, getEntry(dir, 5, snap)); - } - { - PageEntriesEdit edit; - edit.del(buildV3Id(TEST_NAMESPACE_ID, 3)); - edit.del(buildV3Id(TEST_NAMESPACE_ID, 5)); - dir->apply(std::move(edit)); - } - - dir->gcInMemEntries({}); - ASSERT_EQ(dir->numPages(), 0); -} -CATCH - #undef INSERT_ENTRY_TO #undef INSERT_ENTRY #undef INSERT_ENTRY_ACQ_SNAP diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index eed9e4535c7..763659381b3 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -1710,7 +1710,7 @@ try auto done_full_gc = page_storage->gc(); EXPECT_TRUE(done_full_gc); - auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, /* force */ true); + auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, /* force */ true); ASSERT_TRUE(done_snapshot); { @@ -1784,7 +1784,7 @@ try auto done_full_gc = page_storage->gc(); EXPECT_TRUE(done_full_gc); - auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, /* force */ true); + auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, /* force */ true); ASSERT_TRUE(done_snapshot); { diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp index ceb71cfb5e9..389c074d432 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_gc.cpp @@ -104,7 +104,7 @@ try snap = page_storage->getSnapshot(""); // let's compact the WAL logs - auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, /* force */ true); + auto done_snapshot = page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, /* force */ true); ASSERT_TRUE(done_snapshot); // let's try full gc, this will not trigger full gc @@ -215,7 +215,7 @@ try th_gc.get(); // wal compact again - page_storage->page_directory->tryDumpSnapshot(nullptr, true); + page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, true); LOG_INFO(log, "close and restore WAL from disk"); page_storage.reset(); @@ -269,32 +269,29 @@ try } FailPointHelper::enableFailPoint(FailPoints::force_ps_wal_compact); - { - auto sp_gc = getSyncPoint(); - auto th_gc = std::async([&]() { - auto done_full_gc = page_storage->gcImpl(/* not_skip */ true, nullptr, nullptr); - ASSERT_EQ(expectFullGCExecute(), done_full_gc); - }); - // let's compact the WAL logs - sp_gc.waitAndPause(); - - { - // the delete timing is decide by `sp_gc` - WriteBatch batch; - batch.delPage(ref_page_id2); - batch.delPage(ref_page_id3); - batch.delPage(ref_page_id4); - page_storage->write(std::move(batch)); - } + auto sp_gc = getSyncPoint(); + auto th_gc = std::async([&]() { + auto done_full_gc = page_storage->gcImpl(/* not_skip */ true, nullptr, nullptr); + ASSERT_EQ(expectFullGCExecute(), done_full_gc); + }); + // let's compact the WAL logs + sp_gc.waitAndPause(); - // let's try full gc - sp_gc.next(); - th_gc.get(); + { + // the delete timing is decide by `sp_gc` + WriteBatch batch; + batch.delPage(ref_page_id2); + batch.delPage(ref_page_id3); + batch.delPage(ref_page_id4); + page_storage->write(std::move(batch)); } + // let's try full gc + sp_gc.next(); + th_gc.get(); + // wal compact again - page_storage->gcImpl(/* not_skip */ true, nullptr, nullptr); - page_storage->page_directory->tryDumpSnapshot(nullptr, true); + page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, true); LOG_INFO(log, "close and restore WAL from disk"); page_storage.reset(); @@ -432,7 +429,7 @@ try th_foreground_write.get(); // wal compact again - page_storage->page_directory->tryDumpSnapshot(nullptr, true); + page_storage->page_directory->tryDumpSnapshot(nullptr, nullptr, true); LOG_INFO(log, "close and restore WAL from disk"); page_storage.reset(); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp index 5885aa960bd..3163ca158f7 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_versioned_entries.cpp @@ -360,8 +360,8 @@ try PageEntriesV3 removed_entries; INSERT_ENTRY(2); - entries.incrRefCount(PageVersion(2), PageVersion(3)); - entries.incrRefCount(PageVersion(2), PageVersion(4)); + entries.incrRefCount(PageVersion(2)); + entries.incrRefCount(PageVersion(2)); entries.createDelete(PageVersion(5)); // <2, 0> is not available after seq=5, but not get removed @@ -398,8 +398,8 @@ try PageEntriesV3 removed_entries; INSERT_ENTRY(2); - entries.incrRefCount(PageVersion(2), PageVersion(3)); - entries.incrRefCount(PageVersion(2), PageVersion(4)); + entries.incrRefCount(PageVersion(2)); + entries.incrRefCount(PageVersion(2)); entries.createDelete(PageVersion(5)); // <2, 0> is not available after seq=5, but not get removed @@ -430,8 +430,8 @@ try PageEntriesV3 removed_entries; INSERT_ENTRY(2); - entries.incrRefCount(PageVersion(2), PageVersion(3)); - entries.incrRefCount(PageVersion(2), PageVersion(4)); + entries.incrRefCount(PageVersion(2)); + entries.incrRefCount(PageVersion(2)); INSERT_GC_ENTRY(2, 1); INSERT_GC_ENTRY(2, 2); @@ -463,8 +463,8 @@ try PageEntriesV3 removed_entries; INSERT_ENTRY(2); - entries.incrRefCount(PageVersion(2), PageVersion(3)); - entries.incrRefCount(PageVersion(2), PageVersion(4)); + entries.incrRefCount(PageVersion(2)); + entries.incrRefCount(PageVersion(2)); INSERT_GC_ENTRY(2, 1); INSERT_GC_ENTRY(2, 2); entries.createDelete(PageVersion(5)); diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index 432b53e249a..89ec9407f35 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -9631,7 +9631,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(tiflash_storage_page_command_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance, type)", + "expr": "sum(increase(tiflash_storage_page_command_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance, type)", "format": "time_series", "hide": false, "interval": "",