diff --git a/dbms/src/Storages/Page/V3/LogFile/LogFilename.h b/dbms/src/Storages/Page/V3/LogFile/LogFilename.h index d3b53ffa1e4..10b74793690 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogFilename.h +++ b/dbms/src/Storages/Page/V3/LogFile/LogFilename.h @@ -16,6 +16,7 @@ #include +#include #include namespace DB::PS::V3 @@ -56,6 +57,11 @@ struct LogFilename assert(!parent_path.empty()); return fmt::format("{}/{}", parent_path, filename(file_stage)); } + + bool operator==(const LogFilename & other) const + { + return log_num == other.log_num && level_num == other.level_num; + } }; struct LogFilenameCmp @@ -70,3 +76,18 @@ struct LogFilenameCmp using LogFilenameSet = std::set; } // namespace DB::PS::V3 + +namespace std +{ +template <> +struct hash +{ + size_t operator()(const DB::PS::V3::LogFilename & name) const + { + size_t seed = 0; + boost::hash_combine(seed, boost::hash_value(name.log_num)); + boost::hash_combine(seed, boost::hash_value(name.level_num)); + return seed; + } +}; +} // namespace std diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 80a80216b2c..71cc621621a 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -1559,6 +1559,7 @@ std::unordered_set PageDirectory::apply(PageEntriesEdit && edit, watch.restart(); SCOPE_EXIT({ GET_METRIC(tiflash_storage_page_write_duration_seconds, type_commit).Observe(watch.elapsedSeconds()); }); + SYNC_FOR("before_PageDirectory::apply_to_memory"); std::unordered_set applied_data_files; { std::unique_lock table_lock(table_rw_mutex); @@ -1816,22 +1817,21 @@ PageDirectory::getEntriesByBlobIds(const std::vector & blob_i template bool PageDirectory::tryDumpSnapshot(const WriteLimiterPtr & write_limiter, bool force) { + auto identifier = fmt::format("{}.dump", wal->name()); + auto snap = createSnapshot(identifier); + // Only apply compact logs when files snapshot is valid - auto files_snap = wal->tryGetFilesSnapshot(max_persisted_log_files, force); + auto files_snap = wal->tryGetFilesSnapshot( + max_persisted_log_files, + snap->sequence, + details::getMaxSequenceForRecord, + force); if (!files_snap.isValid()) return false; - // To prevent writes from affecting dumping snapshot (and vice versa), old log files - // are read from disk and a temporary PageDirectory is generated for dumping snapshot. - // The main reason write affect dumping snapshot is that we can not get a read-only - // `being_ref_count` by the function `createSnapshot()`. assert(!files_snap.persisted_log_files.empty()); // should not be empty - auto log_num = files_snap.persisted_log_files.rbegin()->log_num; - auto identifier = fmt::format("{}.dump_{}", wal->name(), log_num); Stopwatch watch; - // The records persisted in `files_snap` is older than or equal to all records in `edit` - auto snap = createSnapshot(identifier); auto edit = dumpSnapshotToEdit(snap); files_snap.num_records = edit.size(); files_snap.dump_elapsed_ms = watch.elapsedMilliseconds(); diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 6464ab487f2..abcdabf369b 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -152,9 +152,18 @@ class MultiVersionRefCount new_versioned_ref_counts->emplace_back(ver, ref_count_delta); } } - if (ref_count_delta_in_snap == 0 && new_versioned_ref_counts->empty()) + if (ref_count_delta_in_snap == 0) { - versioned_ref_counts = nullptr; + if (new_versioned_ref_counts->empty()) + { + versioned_ref_counts = nullptr; + } + else + { + // There could be some new ref count created after `snap_seq`, we need to + // keep the newly added ref counts + versioned_ref_counts.swap(new_versioned_ref_counts); + } return; } RUNTIME_CHECK(ref_count_delta_in_snap > 0, deref_count_delta, ref_count_delta_in_snap); @@ -623,6 +632,19 @@ class PageDirectory LoggerPtr log; }; + +namespace details +{ +template +UInt64 getMaxSequenceForRecord(const String & record) +{ + auto edit = Trait::Serializer::deserializeFrom(record, nullptr); + const auto & records = edit.getRecords(); + RUNTIME_CHECK(!records.empty()); + return records.back().version.sequence; +} +} // namespace details + namespace u128 { struct PageDirectoryTrait diff --git a/dbms/src/Storages/Page/V3/WAL/WALReader.cpp b/dbms/src/Storages/Page/V3/WAL/WALReader.cpp index 46ccceddfc7..28cede00950 100644 --- a/dbms/src/Storages/Page/V3/WAL/WALReader.cpp +++ b/dbms/src/Storages/Page/V3/WAL/WALReader.cpp @@ -153,6 +153,56 @@ WALStoreReaderPtr WALStoreReader::create( return create(std::move(storage_name), provider, std::move(log_files), recovery_mode_, read_limiter); } +LogReaderPtr WALStoreReader::createLogReader( + const LogFilename & filename, + FileProviderPtr & provider, + ReportCollector * reporter, + WALRecoveryMode recovery_mode, + const ReadLimiterPtr & read_limiter, + LoggerPtr logger) +{ + const auto log_num = filename.log_num; + const auto fullname = filename.fullname(filename.stage); + LOG_DEBUG(logger, "Open log file for reading [file={}]", fullname); + + auto read_buf = createReadBufferFromFileBaseByFileProvider( + provider, + fullname, + EncryptionPath{fullname, ""}, + /*estimated_size*/ Format::BLOCK_SIZE, + /*aio_threshold*/ 0, + /*read_limiter*/ read_limiter, + /*buffer_size*/ Format::BLOCK_SIZE // Must be `Format::BLOCK_SIZE` + ); + return std::make_unique( + std::move(read_buf), + reporter, + /*verify_checksum*/ true, + log_num, + recovery_mode); +} + +String WALStoreReader::getLastRecordInLogFile( + const LogFilename & filename, + FileProviderPtr & provider, + WALRecoveryMode recovery_mode, + const ReadLimiterPtr & read_limiter, + LoggerPtr logger) +{ + ReportCollector reporter; + auto log_reader = createLogReader(filename, provider, &reporter, recovery_mode, read_limiter, logger); + String last_record; + while (true) + { + auto [ok, record] = log_reader->readRecord(); + if (!ok) + break; + + last_record = std::move(record); + } + return last_record; +} + WALStoreReader::WALStoreReader(String storage_name, FileProviderPtr & provider_, std::optional checkpoint, @@ -209,37 +259,14 @@ bool WALStoreReader::openNextFile() return false; } - auto do_open = [this](const LogFilename & next_file) { - const auto log_num = next_file.log_num; - const auto filename = next_file.filename(next_file.stage); - const auto fullname = next_file.fullname(next_file.stage); - LOG_DEBUG(logger, "Open log file for reading [file={}]", fullname); - - auto read_buf = createReadBufferFromFileBaseByFileProvider( - provider, - fullname, - EncryptionPath{fullname, ""}, - /*estimated_size*/ Format::BLOCK_SIZE, - /*aio_threshold*/ 0, - /*read_limiter*/ read_limiter, - /*buffer_size*/ Format::BLOCK_SIZE // Must be `Format::BLOCK_SIZE` - ); - reader = std::make_unique( - std::move(read_buf), - &reporter, - /*verify_checksum*/ true, - log_num, - recovery_mode); - }; - if (!checkpoint_read_done) { - do_open(*checkpoint_file); + reader = createLogReader(*checkpoint_file, provider, &reporter, recovery_mode, read_limiter, logger); checkpoint_read_done = true; } else { - do_open(*next_reading_file); + reader = createLogReader(*next_reading_file, provider, &reporter, recovery_mode, read_limiter, logger); ++next_reading_file; } return true; diff --git a/dbms/src/Storages/Page/V3/WAL/WALReader.h b/dbms/src/Storages/Page/V3/WAL/WALReader.h index ac3ff228990..56d93af8f91 100644 --- a/dbms/src/Storages/Page/V3/WAL/WALReader.h +++ b/dbms/src/Storages/Page/V3/WAL/WALReader.h @@ -48,6 +48,8 @@ class ReportCollector : public LogReader::Reporter bool error_happened = false; }; +using LogReaderPtr = std::unique_ptr; + class WALStoreReader { public: @@ -67,6 +69,21 @@ class WALStoreReader WALRecoveryMode recovery_mode_ = WALRecoveryMode::TolerateCorruptedTailRecords, const ReadLimiterPtr & read_limiter = nullptr); + static LogReaderPtr createLogReader( + const LogFilename & filename, + FileProviderPtr & provider, + ReportCollector * reporter, + WALRecoveryMode recovery_mode, + const ReadLimiterPtr & read_limiter, + LoggerPtr logger); + + static String getLastRecordInLogFile( + const LogFilename & filename, + FileProviderPtr & provider, + WALRecoveryMode recovery_mode, + const ReadLimiterPtr & read_limiter, + LoggerPtr logger); + bool remained() const; std::optional next(); diff --git a/dbms/src/Storages/Page/V3/WALStore.cpp b/dbms/src/Storages/Page/V3/WALStore.cpp index bc002b6c47f..beb23a7c0c0 100644 --- a/dbms/src/Storages/Page/V3/WALStore.cpp +++ b/dbms/src/Storages/Page/V3/WALStore.cpp @@ -163,7 +163,7 @@ void WALStore::updateDiskUsage(const LogFilenameSet & log_filenames) } } -WALStore::FilesSnapshot WALStore::tryGetFilesSnapshot(size_t max_persisted_log_files, bool force) +WALStore::FilesSnapshot WALStore::tryGetFilesSnapshot(size_t max_persisted_log_files, UInt64 snap_sequence, std::function max_sequence_getter, bool force) { // First we simply check whether the number of files is enough for compaction LogFilenameSet persisted_log_files = WALStoreReader::listAllFiles(delegator, logger); @@ -192,15 +192,23 @@ WALStore::FilesSnapshot WALStore::tryGetFilesSnapshot(size_t max_persisted_log_f log_file.reset(); } - for (auto iter = persisted_log_files.begin(); iter != persisted_log_files.end(); /*empty*/) + // traverse in reverse order, + // so that once the first log file whose max sequence is smaller or equal to snap_sequence is found, + // we don't need to check the max sequence for the rest log files. + bool found_log_file_smaller_than_snap_sequence = false; + LogFilenameSet snap_log_files; + for (auto iter = persisted_log_files.rbegin(); iter != persisted_log_files.rend(); ++iter) // NOLINT { if (iter->log_num >= current_writing_log_num) - iter = persisted_log_files.erase(iter); - else - ++iter; + continue; + if (!found_log_file_smaller_than_snap_sequence && getLogFileMaxSequence(*iter, max_sequence_getter) > snap_sequence) + continue; + + found_log_file_smaller_than_snap_sequence = true; + snap_log_files.emplace(*iter); } return WALStore::FilesSnapshot{ - .persisted_log_files = std::move(persisted_log_files), + .persisted_log_files = std::move(snap_log_files), }; } @@ -242,11 +250,7 @@ bool WALStore::saveSnapshot( LOG_INFO(logger, "Rename log file to normal done [tempname={}] [fullname={}]", temp_fullname, normal_fullname); // Remove compacted log files. - for (const auto & filename : files_snap.persisted_log_files) - { - const auto log_fullname = filename.fullname(LogFileStage::Normal); - provider->deleteRegularFile(log_fullname, EncryptionPath(log_fullname, "")); - } + removeLogFiles(files_snap.persisted_log_files); auto get_logging_str = [&]() { FmtBuffer fmt_buf; @@ -270,4 +274,35 @@ bool WALStore::saveSnapshot( return true; } +void WALStore::removeLogFiles(const LogFilenameSet & log_filenames) +{ + for (const auto & filename : log_filenames) + { + const auto log_fullname = filename.fullname(LogFileStage::Normal); + provider->deleteRegularFile(log_fullname, EncryptionPath(log_fullname, "")); + } + { + std::unique_lock lock(log_file_max_sequences_cache_mutex); + for (const auto & filename : log_filenames) + { + log_file_max_sequences_cache.erase(filename); + } + } +} + +UInt64 WALStore::getLogFileMaxSequence(const LogFilename & log_filename, std::function max_sequence_getter) +{ + std::unique_lock lock(log_file_max_sequences_cache_mutex); + auto iter = log_file_max_sequences_cache.find(log_filename); + if (iter != log_file_max_sequences_cache.end()) + return iter->second; + + auto last_record = WALStoreReader::getLastRecordInLogFile(log_filename, provider, config.getRecoverMode(), /*read_limiter*/ nullptr, logger); + if (last_record.empty()) + return 0; // empty log file + + UInt64 max_sequence = max_sequence_getter(last_record); + log_file_max_sequences_cache.emplace(log_filename, max_sequence); + return max_sequence; +} } // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/WALStore.h b/dbms/src/Storages/Page/V3/WALStore.h index 3dae34a5e27..1101b5cec66 100644 --- a/dbms/src/Storages/Page/V3/WALStore.h +++ b/dbms/src/Storages/Page/V3/WALStore.h @@ -92,7 +92,7 @@ class WALStore } }; - FilesSnapshot tryGetFilesSnapshot(size_t max_persisted_log_files, bool force); + FilesSnapshot tryGetFilesSnapshot(size_t max_persisted_log_files, UInt64 snap_sequence, std::function max_sequence_getter, bool force); bool saveSnapshot( FilesSnapshot && files_snap, @@ -119,6 +119,10 @@ class WALStore void updateDiskUsage(const LogFilenameSet & log_filenames); + void removeLogFiles(const LogFilenameSet & log_filenames); + + UInt64 getLogFileMaxSequence(const LogFilename & log_filename, std::function max_sequence_getter); + private: const String storage_name; PSDiskDelegatorPtr delegator; @@ -129,6 +133,9 @@ class WALStore UInt32 wal_paths_index; std::unique_ptr log_file; + mutable std::mutex log_file_max_sequences_cache_mutex; + std::unordered_map log_file_max_sequences_cache; + // Cached values when `tryGetFilesSnapshot` is called mutable std::mutex mtx_disk_usage; size_t num_log_files; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 4db0540e2b3..fed9ba67b61 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -1163,6 +1163,53 @@ TEST_F(PageDirectoryGCTest, ManyEditsAndDumpSnapshot) } } +TEST_F(PageDirectoryGCTest, DumpSnapshotDuringWrite) +{ + // write some data and roll the log file + for (size_t i = 0; i < 100; ++i) + { + INSERT_ENTRY(i + 50, i); + } + ASSERT_TRUE(dir->tryDumpSnapshot(nullptr, true)); + + // write a few more data + for (size_t i = 100; i < 110; ++i) + { + INSERT_ENTRY(i + 50, i); + } + + auto sp_before_apply_memory = SyncPointCtl::enableInScope("before_PageDirectory::apply_to_memory"); + auto th_write1 = std::async([&]() { + PageEntriesEdit edit; + PageEntryV3 entry_1_v1{.file_id = 1, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + edit.put(buildV3Id(TEST_NAMESPACE_ID, 5352), entry_1_v1); + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 5353), buildV3Id(TEST_NAMESPACE_ID, 5352)); + dir->apply(std::move(edit)); + }); + sp_before_apply_memory.waitAndPause(); + + // dump snapshot during write + ASSERT_TRUE(dir->tryDumpSnapshot(nullptr, true)); + + sp_before_apply_memory.next(); + th_write1.get(); + + { + auto snap = dir->createSnapshot(); + auto normal_id = getNormalPageIdU64(dir, 5353, snap); + EXPECT_EQ(normal_id, 5352); + } + + dir.reset(); + + dir = restoreFromDisk(); + { + auto snap = dir->createSnapshot(); + auto normal_id = getNormalPageIdU64(dir, 5353, snap); + EXPECT_EQ(normal_id, 5352); + } +} + TEST_F(PageDirectoryTest, RestoreWithRefToDeletedPage) try { @@ -2529,6 +2576,52 @@ try } CATCH + +TEST_F(PageDirectoryGCTest, IncrRefDuringGC2) +try +{ + PageEntryV3 entry_1_v1{.file_id = 50, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(buildV3Id(TEST_NAMESPACE_ID, 1), entry_1_v1); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 2), buildV3Id(TEST_NAMESPACE_ID, 1)); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.del(buildV3Id(TEST_NAMESPACE_ID, 2)); + dir->apply(std::move(edit)); + } + + auto after_get_gc_seq = SyncPointCtl::enableInScope("after_PageDirectory::doGC_getLowestSeq"); + auto th_gc = std::async([&]() { + dir->gcInMemEntries({}); + }); + after_get_gc_seq.waitAndPause(); + + // add a ref during gcInMemEntries + { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 5), buildV3Id(TEST_NAMESPACE_ID, 1)); + dir->apply(std::move(edit)); + } + + after_get_gc_seq.next(); + th_gc.get(); + + { + auto snap = dir->createSnapshot(); + auto normal_id = getNormalPageIdU64(dir, 5, snap); + EXPECT_EQ(normal_id, 1); + ASSERT_EQ(dir->numPages(), 2); + } +} +CATCH + #undef INSERT_ENTRY_TO #undef INSERT_ENTRY #undef INSERT_ENTRY_ACQ_SNAP diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp index db92eba52b6..d29d1ecab05 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -854,7 +855,7 @@ TEST_P(WALStoreTest, GetFileSnapshot) ASSERT_NE(wal, nullptr); // running gc right before any writes is skip - ASSERT_FALSE(wal->tryGetFilesSnapshot(1, false).isValid()); + ASSERT_FALSE(wal->tryGetFilesSnapshot(1, std::numeric_limits::max(), details::getMaxSequenceForRecord, false).isValid()); // generate log_1_0, log_2_0, log_3_0 rollToNewLogWriter(wal); @@ -863,17 +864,17 @@ TEST_P(WALStoreTest, GetFileSnapshot) ASSERT_EQ(getNumLogFiles(), 3); // num of files not exceed 5, skip - ASSERT_FALSE(wal->tryGetFilesSnapshot(5, false).isValid()); + ASSERT_FALSE(wal->tryGetFilesSnapshot(5, std::numeric_limits::max(), details::getMaxSequenceForRecord, false).isValid()); // num of files not exceed 3, skip - ASSERT_FALSE(wal->tryGetFilesSnapshot(3, false).isValid()); + ASSERT_FALSE(wal->tryGetFilesSnapshot(3, std::numeric_limits::max(), details::getMaxSequenceForRecord, false).isValid()); // num of files not exceed 3, but still valid when `force` is true - ASSERT_TRUE(wal->tryGetFilesSnapshot(3, true).isValid()); + ASSERT_TRUE(wal->tryGetFilesSnapshot(3, std::numeric_limits::max(), details::getMaxSequenceForRecord, true).isValid()); rollToNewLogWriter(wal); // num of files exceed 3, return { ASSERT_EQ(getNumLogFiles(), 4); - auto files = wal->tryGetFilesSnapshot(3, false); + auto files = wal->tryGetFilesSnapshot(3, std::numeric_limits::max(), details::getMaxSequenceForRecord, false); ASSERT_TRUE(files.isValid()); ASSERT_EQ(files.persisted_log_files.size(), 4); ASSERT_EQ(files.persisted_log_files.begin()->log_num, 1); @@ -890,7 +891,7 @@ TEST_P(WALStoreTest, GetFileSnapshot) { ASSERT_EQ(getNumLogFiles(), 5); - auto files = wal->tryGetFilesSnapshot(3, false); + auto files = wal->tryGetFilesSnapshot(3, std::numeric_limits::max(), details::getMaxSequenceForRecord, false); ASSERT_TRUE(files.isValid()); ASSERT_EQ(files.persisted_log_files.size(), 5); ASSERT_EQ(files.persisted_log_files.begin()->log_num, 1);