From 1607a0ecfc84f67e74130a095dc50fa4db43d46b Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 8 Jun 2023 13:39:39 +0800 Subject: [PATCH 1/6] remove unnecessary fsync --- dbms/src/Common/CurrentMetrics.cpp | 1 + .../Page/V3/CheckpointFile/CPFilesWriter.cpp | 57 +++++++++++-------- .../Storages/Page/V3/LogFile/LogWriter.cpp | 18 +++--- dbms/src/Storages/Page/V3/LogFile/LogWriter.h | 2 +- dbms/src/Storages/Page/V3/PageDirectory.cpp | 3 +- 5 files changed, 47 insertions(+), 34 deletions(-) diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index 1c302b6e231..66511160c15 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -28,6 +28,7 @@ M(PSMVCCSnapshotsList) \ M(PSMVCCNumDelta) \ M(PSMVCCNumBase) \ + M(PSWriterQueueSize) \ M(RWLockWaitingReaders) \ M(RWLockWaitingWriters) \ M(RWLockActiveReaders) \ diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp index 886011a6e13..d6a75f63bcc 100644 --- a/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp @@ -161,33 +161,42 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo( // 2. For entry edits without the checkpoint info, or it is stored on an existing data file that needs compact, // write the entry data to the data file, and assign a new checkpoint info. - auto page = data_source->read({rec_edit.page_id, rec_edit.entry}); - RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit); - auto data_location = data_writer->write( - rec_edit.page_id, - rec_edit.version, - page.data.begin(), - page.data.size()); - // the page data size uploaded in this checkpoint - write_down_stats.num_bytes[static_cast(id_storage_type)] += rec_edit.entry.size; - current_write_size += data_location.size_in_file; - RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size); - bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed; - rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{ - .data_location = data_location, - .is_valid = true, - .is_local_data_reclaimed = is_local_data_reclaimed, - }; - locked_files.emplace(*data_location.data_file_id); - if (is_compaction) + try { - write_down_stats.compact_data_bytes += rec_edit.entry.size; - write_down_stats.num_pages_compact += 1; + auto page = data_source->read({rec_edit.page_id, rec_edit.entry}); + RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit); + auto data_location = data_writer->write( + rec_edit.page_id, + rec_edit.version, + page.data.begin(), + page.data.size()); + // the page data size uploaded in this checkpoint + write_down_stats.num_bytes[static_cast(id_storage_type)] += rec_edit.entry.size; + current_write_size += data_location.size_in_file; + RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size); + bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed; + rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{ + .data_location = data_location, + .is_valid = true, + .is_local_data_reclaimed = is_local_data_reclaimed, + }; + locked_files.emplace(*data_location.data_file_id); + if (is_compaction) + { + write_down_stats.compact_data_bytes += rec_edit.entry.size; + write_down_stats.num_pages_compact += 1; + } + else + { + write_down_stats.incremental_data_bytes += rec_edit.entry.size; + write_down_stats.num_pages_incremental += 1; + } } - else + catch (...) { - write_down_stats.incremental_data_bytes += rec_edit.entry.size; - write_down_stats.num_pages_incremental += 1; + LOG_ERROR(log, "failed to read page, record={}", rec_edit); + tryLogCurrentException(__PRETTY_FUNCTION__); + throw; } } diff --git a/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp b/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp index 48442526ce2..8c171699660 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp +++ b/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include #include #include @@ -69,7 +71,7 @@ size_t LogWriter::writtenBytes() const return written_bytes; } -void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background) +void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background, bool sync) { if (write_buffer.offset() == 0) { @@ -84,7 +86,10 @@ void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background) /*background=*/background, /*truncate_if_failed=*/false, /*enable_failpoint=*/false); - log_file->fsync(); + if (sync) + { + log_file->fsync(); + } written_bytes += write_buffer.offset(); // reset the write_buffer @@ -115,7 +120,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const static constexpr char MAX_ZERO_HEADER[Format::RECYCLABLE_HEADER_SIZE]{'\x00'}; if (unlikely(buffer_size - write_buffer.offset() < leftover)) { - flush(write_limiter, background); + flush(write_limiter, background, false); } writeString(MAX_ZERO_HEADER, leftover, write_buffer); block_offset = 0; @@ -141,7 +146,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const // Check available space in write_buffer before writing if (buffer_size - write_buffer.offset() < fragment_length + header_size) { - flush(write_limiter, background); + flush(write_limiter, background, false); } try { @@ -158,10 +163,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const begin = false; } while (payload.hasPendingData()); - if (!manual_flush) - { - flush(write_limiter, background); - } + flush(write_limiter, background, !manual_flush); } void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload, size_t length) diff --git a/dbms/src/Storages/Page/V3/LogFile/LogWriter.h b/dbms/src/Storages/Page/V3/LogFile/LogWriter.h index 8b1b791b67c..9afc5ee655a 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogWriter.h +++ b/dbms/src/Storages/Page/V3/LogFile/LogWriter.h @@ -87,7 +87,7 @@ class LogWriter final : private Allocator void addRecord(ReadBuffer & payload, size_t payload_size, const WriteLimiterPtr & write_limiter = nullptr, bool background = false); - void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false); + void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false, bool sync = true); void close(); diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index e998b7e0108..aa03320ee25 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -50,6 +50,7 @@ namespace CurrentMetrics { extern const Metric PSMVCCSnapshotsList; +extern const Metric PSWriterQueueSize; } // namespace CurrentMetrics namespace DB @@ -1484,6 +1485,7 @@ std::unordered_set PageDirectory::apply(PageEntriesEdit && edit, watch.restart(); writers.push_back(&w); + CurrentMetrics::set(CurrentMetrics::PSWriterQueueSize, writers.size()); w.cv.wait(apply_lock, [&] { return w.done || &w == writers.front(); }); GET_METRIC(tiflash_storage_page_write_duration_seconds, type_wait_in_group).Observe(watch.elapsedSeconds()); watch.restart(); @@ -1504,7 +1506,6 @@ std::unordered_set PageDirectory::apply(PageEntriesEdit && edit, // group owner, others just return an empty set. return {}; } - auto * last_writer = buildWriteGroup(&w, apply_lock); apply_lock.unlock(); From f77342dee19104a83162af73e2106be843a7f604 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 8 Jun 2023 14:01:51 +0800 Subject: [PATCH 2/6] small refactor LogWriter --- .../Storages/Page/V3/LogFile/LogWriter.cpp | 62 ++++++++++--------- dbms/src/Storages/Page/V3/LogFile/LogWriter.h | 11 ++-- dbms/src/Storages/Page/V3/WALStore.cpp | 11 ++-- dbms/src/Storages/Page/V3/WALStore.h | 2 +- .../Storages/Page/V3/tests/gtest_wal_log.cpp | 4 +- 5 files changed, 48 insertions(+), 42 deletions(-) diff --git a/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp b/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp index 8c171699660..cfc4709b7e0 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp +++ b/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp @@ -15,8 +15,6 @@ #include #include #include -#include -#include #include #include #include @@ -33,13 +31,13 @@ LogWriter::LogWriter( const FileProviderPtr & file_provider_, Format::LogNumberType log_number_, bool recycle_log_files_, - bool manual_flush_) + bool manual_sync_) : path(path_) , file_provider(file_provider_) , block_offset(0) , log_number(log_number_) , recycle_log_files(recycle_log_files_) - , manual_flush(manual_flush_) + , manual_sync(manual_sync_) , write_buffer(nullptr, 0) { log_file = file_provider->newWritableFile( @@ -71,29 +69,9 @@ size_t LogWriter::writtenBytes() const return written_bytes; } -void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background, bool sync) +void LogWriter::sync() { - if (write_buffer.offset() == 0) - { - return; - } - - PageUtil::writeFile(log_file, - written_bytes, - write_buffer.buffer().begin(), - write_buffer.offset(), - write_limiter, - /*background=*/background, - /*truncate_if_failed=*/false, - /*enable_failpoint=*/false); - if (sync) - { - log_file->fsync(); - } - written_bytes += write_buffer.offset(); - - // reset the write_buffer - resetBuffer(); + log_file->fsync(); } void LogWriter::close() @@ -120,7 +98,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const static constexpr char MAX_ZERO_HEADER[Format::RECYCLABLE_HEADER_SIZE]{'\x00'}; if (unlikely(buffer_size - write_buffer.offset() < leftover)) { - flush(write_limiter, background, false); + flush(write_limiter, background); } writeString(MAX_ZERO_HEADER, leftover, write_buffer); block_offset = 0; @@ -146,7 +124,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const // Check available space in write_buffer before writing if (buffer_size - write_buffer.offset() < fragment_length + header_size) { - flush(write_limiter, background, false); + flush(write_limiter, background); } try { @@ -163,7 +141,11 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const begin = false; } while (payload.hasPendingData()); - flush(write_limiter, background, !manual_flush); + flush(write_limiter, background); + if (!manual_sync) + { + sync(); + } } void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload, size_t length) @@ -218,4 +200,26 @@ void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload block_offset += header_size + length; } + +void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background) +{ + if (write_buffer.offset() == 0) + { + return; + } + + PageUtil::writeFile(log_file, + written_bytes, + write_buffer.buffer().begin(), + write_buffer.offset(), + write_limiter, + /*background=*/background, + /*truncate_if_failed=*/false, + /*enable_failpoint=*/false); + + written_bytes += write_buffer.offset(); + + // reset the write_buffer + resetBuffer(); +} } // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/LogFile/LogWriter.h b/dbms/src/Storages/Page/V3/LogFile/LogWriter.h index 9afc5ee655a..0ee33a9879d 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogWriter.h +++ b/dbms/src/Storages/Page/V3/LogFile/LogWriter.h @@ -79,7 +79,7 @@ class LogWriter final : private Allocator const FileProviderPtr & file_provider_, Format::LogNumberType log_number_, bool recycle_log_files_, - bool manual_flush_ = false); + bool manual_sync_ = false); DISALLOW_COPY(LogWriter); @@ -87,7 +87,7 @@ class LogWriter final : private Allocator void addRecord(ReadBuffer & payload, size_t payload_size, const WriteLimiterPtr & write_limiter = nullptr, bool background = false); - void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false, bool sync = true); + void sync(); void close(); @@ -103,6 +103,8 @@ class LogWriter final : private Allocator void resetBuffer(); + void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false); + private: String path; FileProviderPtr file_provider; @@ -112,9 +114,8 @@ class LogWriter final : private Allocator size_t block_offset; // Current offset in block Format::LogNumberType log_number; const bool recycle_log_files; - // If true, it does not flush after each write. Instead it relies on the upper - // layer to manually does the flush by calling ::flush() - const bool manual_flush; + // If true, the upper layer need manually sync the log file after write by calling LogWriter::sync() + const bool manual_sync; size_t written_bytes = 0; diff --git a/dbms/src/Storages/Page/V3/WALStore.cpp b/dbms/src/Storages/Page/V3/WALStore.cpp index eae95507470..707e2da96fa 100644 --- a/dbms/src/Storages/Page/V3/WALStore.cpp +++ b/dbms/src/Storages/Page/V3/WALStore.cpp @@ -107,7 +107,7 @@ Format::LogNumberType WALStore::rollToNewLogWriter(const std::lock_guard, LogFilename> WALStore::createLogWriter( const std::pair & new_log_lvl, - bool manual_flush) + bool temp_file) { String path; @@ -130,7 +130,7 @@ std::tuple, LogFilename> WALStore::createLogWriter( path += wal_folder_prefix; LogFilename log_filename = LogFilename{ - (manual_flush ? LogFileStage::Temporary : LogFileStage::Normal), + (temp_file ? LogFileStage::Temporary : LogFileStage::Normal), new_log_lvl.first, new_log_lvl.second, 0, @@ -139,12 +139,13 @@ std::tuple, LogFilename> WALStore::createLogWriter( auto fullname = log_filename.fullname(log_filename.stage); // TODO check whether the file already existed LOG_INFO(logger, "Creating log file for writing [fullname={}]", fullname); + // if it is a temp file, we will manually sync it after writing snapshot auto log_writer = std::make_unique( fullname, provider, new_log_lvl.first, /*recycle*/ true, - /*manual_flush*/ manual_flush); + /*manual_sync*/ temp_file); return {std::move(log_writer), log_filename}; } @@ -218,13 +219,13 @@ bool WALStore::saveSnapshot( // Use {largest_log_num, 1} to save the `edit` const auto log_num = files_snap.persisted_log_files.rbegin()->log_num; // Create a temporary file for saving directory snapshot - auto [compact_log, log_filename] = createLogWriter({log_num, 1}, /*manual_flush*/ true); + auto [compact_log, log_filename] = createLogWriter({log_num, 1}, /*temp_file*/ true); // TODO: split the snap into multiple records in LogFile so that the memory // consumption could be more smooth. ReadBufferFromString payload(serialized_snap); compact_log->addRecord(payload, serialized_snap.size(), write_limiter, /*background*/ true); - compact_log->flush(write_limiter, /*background*/ true); + compact_log->sync(); compact_log.reset(); // close fd explicitly before renaming file. // Rename it to be a normal log file. diff --git a/dbms/src/Storages/Page/V3/WALStore.h b/dbms/src/Storages/Page/V3/WALStore.h index 8ce22ed053e..fe862cbbacf 100644 --- a/dbms/src/Storages/Page/V3/WALStore.h +++ b/dbms/src/Storages/Page/V3/WALStore.h @@ -113,7 +113,7 @@ class WALStore std::tuple, LogFilename> createLogWriter( const std::pair & new_log_lvl, - bool manual_flush); + bool temp_file); Format::LogNumberType rollToNewLogWriter(const std::lock_guard &); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp index 6d744889a84..9df80970e26 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp @@ -810,7 +810,7 @@ INSTANTIATE_TEST_CASE_P( return fmt::format("{}_{}", recycle_log, allow_retry_read); }); -TEST(LogFileRWTest2, ManuallyFlush) +TEST(LogFileRWTest2, ManuallySync) { auto provider = TiFlashTestEnv::getDefaultFileProvider(); auto path = TiFlashTestEnv::getTemporaryPath("LogFileRWTest2"); @@ -835,7 +835,7 @@ TEST(LogFileRWTest2, ManuallyFlush) ReadBufferFromString buff(payload); ASSERT_NO_THROW(writer->addRecord(buff, payload.size())); } - writer->flush(); + writer->sync(); auto read_buf = createReadBufferFromFileBaseByFileProvider( provider, From 96b9678ef90dc969bc3c5aede770769941d12ca0 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 8 Jun 2023 14:57:00 +0800 Subject: [PATCH 3/6] add test for pipeline write --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 2 + dbms/src/Storages/Page/V3/PageDirectory.h | 7 ++ .../Page/V3/tests/gtest_page_directory.cpp | 84 +++++++++++++++++++ 3 files changed, 93 insertions(+) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index aa03320ee25..6215dfa48c7 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -1485,6 +1485,7 @@ std::unordered_set PageDirectory::apply(PageEntriesEdit && edit, watch.restart(); writers.push_back(&w); + SYNC_FOR("after_PageDirectory::enter_write_group"); CurrentMetrics::set(CurrentMetrics::PSWriterQueueSize, writers.size()); w.cv.wait(apply_lock, [&] { return w.done || &w == writers.front(); }); GET_METRIC(tiflash_storage_page_write_duration_seconds, type_wait_in_group).Observe(watch.elapsedSeconds()); @@ -1508,6 +1509,7 @@ std::unordered_set PageDirectory::apply(PageEntriesEdit && edit, } auto * last_writer = buildWriteGroup(&w, apply_lock); apply_lock.unlock(); + SYNC_FOR("before_PageDirectory::leader_apply"); // `true` means the write process has completed without exception bool success = false; diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 9877def755d..45732e66dbc 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -414,6 +414,13 @@ class PageDirectory return u; } + // `writers` should be used under the protection of apply_mutex + // So don't use this function in production code + size_t getWritersQueueSizeForTest() + { + return writers.size(); + } + // No copying and no moving DISALLOW_COPY_AND_MOVE(PageDirectory); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 3cabc291af1..59430df244b 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -549,6 +549,90 @@ TEST_F(PageDirectoryTest, RefWontDeadLock) dir->apply(std::move(edit2)); } +TEST_F(PageDirectoryTest, BatchWriteSuccess) +{ + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 3, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + + auto sp_before_leader_apply = SyncPointCtl::enableInScope("before_PageDirectory::leader_apply"); + auto th_write1 = std::async([&]() { + PageEntriesEdit edit; + edit.put(buildV3Id(TEST_NAMESPACE_ID, 1), entry1); + dir->apply(std::move(edit)); + }); + sp_before_leader_apply.waitAndPause(); + + // form a write group + auto sp_after_enter_write_group = SyncPointCtl::enableInScope("after_PageDirectory::enter_write_group"); + auto th_write2 = std::async([&]() { + PageEntriesEdit edit; + edit.put(buildV3Id(TEST_NAMESPACE_ID, 2), entry2); + dir->apply(std::move(edit)); + }); + auto th_write3 = std::async([&]() { + PageEntriesEdit edit; + edit.put(buildV3Id(TEST_NAMESPACE_ID, 3), entry3); + dir->apply(std::move(edit)); + }); + sp_after_enter_write_group.waitAndNext(); + sp_after_enter_write_group.waitAndNext(); + ASSERT_EQ(dir->getWritersQueueSizeForTest(), 3); // 3 writers in write group + + sp_before_leader_apply.next(); // continue first leader_apply + th_write1.get(); + + sp_before_leader_apply.waitAndNext(); // continue second leader_apply + th_write2.get(); + th_write3.get(); + ASSERT_EQ(dir->getWritersQueueSizeForTest(), 0); + + auto snap = dir->createSnapshot(); + EXPECT_ENTRY_EQ(entry1, dir, 1, snap); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap); + EXPECT_ENTRY_EQ(entry3, dir, 3, snap); +} + +TEST_F(PageDirectoryTest, BatchWriteException) +{ + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + + auto sp_before_leader_apply = SyncPointCtl::enableInScope("before_PageDirectory::leader_apply"); + auto th_write1 = std::async([&]() { + PageEntriesEdit edit; + edit.put(buildV3Id(TEST_NAMESPACE_ID, 1), entry1); + dir->apply(std::move(edit)); + }); + sp_before_leader_apply.waitAndPause(); + + // form a write group + auto sp_after_enter_write_group = SyncPointCtl::enableInScope("after_PageDirectory::enter_write_group"); + auto th_write2 = std::async([&]() { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 2), buildV3Id(TEST_NAMESPACE_ID, 100)); + ASSERT_ANY_THROW(dir->apply(std::move(edit))); + }); + auto th_write3 = std::async([&]() { + PageEntriesEdit edit; + edit.ref(buildV3Id(TEST_NAMESPACE_ID, 3), buildV3Id(TEST_NAMESPACE_ID, 100)); + ASSERT_ANY_THROW(dir->apply(std::move(edit))); + }); + sp_after_enter_write_group.waitAndNext(); + sp_after_enter_write_group.waitAndNext(); + ASSERT_EQ(dir->getWritersQueueSizeForTest(), 3); // 3 writers in write group + + sp_before_leader_apply.next(); // continue first leader_apply + th_write1.get(); + + sp_before_leader_apply.waitAndNext(); // continue secode leader_apply + th_write2.get(); + th_write3.get(); + ASSERT_EQ(dir->getWritersQueueSizeForTest(), 0); + + auto snap = dir->createSnapshot(); + EXPECT_ENTRY_EQ(entry1, dir, 1, snap); +} + TEST_F(PageDirectoryTest, IdempotentNewExtPageAfterAllCleaned) { // Make sure creating ext page after itself and all its reference are clean From a335fc9ff9a2cf79f5ccd35ccf9fd3cd798089b2 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 8 Jun 2023 16:15:33 +0800 Subject: [PATCH 4/6] add some metrics --- dbms/src/Common/TiFlashMetrics.h | 3 +++ dbms/src/Storages/Page/V3/BlobStore.cpp | 3 +++ dbms/src/Storages/Page/V3/PageDirectory.cpp | 8 +++++++- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index ac6fe0af02b..00b3e750068 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -194,6 +194,9 @@ namespace DB F(type_fullgc_commit, {{"type", "fullgc_commit"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_clean_external, {{"type", "clean_external"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_v3, {{"type", "v3"}}, ExpBuckets{0.0005, 2, 20})) \ + M(tiflash_storage_page_command_count, "Total number of PageStorage's command, such as write / read / scan / snapshot", Counter, \ + F(type_write, {"type", "write"}), F(type_read, {"type", "read"}), \ + F(type_scan, {"type", "scan"}), F(type_snapshot, {"type", "snapshot"})) \ M(tiflash_storage_page_write_batch_size, "The size of each write batch in bytes", Histogram, \ F(type_v3, {{"type", "v3"}}, ExpBuckets{4 * 1024, 4, 10})) \ M(tiflash_storage_page_write_duration_seconds, "The duration of each write batch", Histogram, \ diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 3b16a77991f..99b59351b80 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -883,6 +883,7 @@ template typename BlobStore::PageMap BlobStore::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_limiter) { + GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); if (entries.empty()) { return {}; @@ -985,6 +986,7 @@ BlobStore::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_l template Page BlobStore::read(const PageIdAndEntry & id_entry, const ReadLimiterPtr & read_limiter) { + GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); const auto & [page_id_v3, entry] = id_entry; const size_t buf_size = entry.size; @@ -1044,6 +1046,7 @@ Page BlobStore::read(const PageIdAndEntry & id_entry, const ReadLimiterPt template BlobFilePtr BlobStore::read(const typename BlobStore::PageId & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background) { + GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); assert(buffers != nullptr); BlobFilePtr blob_file = getBlobFile(blob_id); try diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 6215dfa48c7..42ee99cef11 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -965,6 +965,7 @@ PageDirectory::PageDirectory(String storage_name, WALStorePtr && wal_, UI template PageDirectorySnapshotPtr PageDirectory::createSnapshot(const String & tracing_id) const { + GET_METRIC(tiflash_storage_page_command_count, type_snapshot).Increment(); auto snap = std::make_shared(sequence.load(), tracing_id); { std::lock_guard snapshots_lock(snapshots_mutex); @@ -1019,6 +1020,7 @@ SnapshotsStatistics PageDirectory::getSnapshotsStat() const template typename PageDirectory::PageIdAndEntry PageDirectory::getByIDImpl(const PageId & page_id, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const { + GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); PageEntryV3 entry_got; // After two write batches applied: [ver=1]{put 10}, [ver=2]{ref 11->10, del 10}, the `mvcc_table_directory` is: @@ -1113,6 +1115,7 @@ template std::pair::PageIdAndEntries, typename PageDirectory::PageIds> PageDirectory::getByIDsImpl(const typename PageDirectory::PageIds & page_ids, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const { + GET_METRIC(tiflash_storage_page_command_count, type_read).Increment(); PageEntryV3 entry_got; PageIds page_not_found = {}; @@ -1258,6 +1261,7 @@ UInt64 PageDirectory::getMaxIdAfterRestart() const template typename PageDirectory::PageIdSet PageDirectory::getAllPageIds() { + GET_METRIC(tiflash_storage_page_command_count, type_scan).Increment(); std::set page_ids; std::shared_lock read_lock(table_rw_mutex); @@ -1274,6 +1278,7 @@ typename PageDirectory::PageIdSet PageDirectory::getAllPageIds() template typename PageDirectory::PageIdSet PageDirectory::getAllPageIdsWithPrefix(const String & prefix, const DB::PageStorageSnapshotPtr & snap_) { + GET_METRIC(tiflash_storage_page_command_count, type_scan).Increment(); if constexpr (std::is_same_v) { PageIdSet page_ids; @@ -1300,6 +1305,7 @@ typename PageDirectory::PageIdSet PageDirectory::getAllPageIdsWith template typename PageDirectory::PageIdSet PageDirectory::getAllPageIdsInRange(const PageId & start, const PageId & end, const DB::PageStorageSnapshotPtr & snap_) { + GET_METRIC(tiflash_storage_page_command_count, type_scan).Increment(); if constexpr (std::is_same_v) { PageIdSet page_ids; @@ -1474,7 +1480,7 @@ std::unordered_set PageDirectory::apply(PageEntriesEdit && edit, // We need to make sure there is only one apply thread to write wal and then increase `sequence`. // Note that, as read threads use current `sequence` as read_seq, we cannot increase `sequence` // before applying edit to `mvcc_table_directory`. - + GET_METRIC(tiflash_storage_page_command_count, type_write).Increment(); Writer w; w.edit = &edit; From 6c2f9c35a94a31cbdf7841a6e5a03b593a760e78 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 8 Jun 2023 18:31:07 +0800 Subject: [PATCH 5/6] add grafana --- dbms/src/Common/CurrentMetrics.cpp | 2 +- dbms/src/Storages/Page/V3/PageDirectory.cpp | 4 +- metrics/grafana/tiflash_summary.json | 230 +++++++++++++++++++- 3 files changed, 224 insertions(+), 12 deletions(-) diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index 66511160c15..a476f06c9b0 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -28,7 +28,7 @@ M(PSMVCCSnapshotsList) \ M(PSMVCCNumDelta) \ M(PSMVCCNumBase) \ - M(PSWriterQueueSize) \ + M(PSPendingWriterNum) \ M(RWLockWaitingReaders) \ M(RWLockWaitingWriters) \ M(RWLockActiveReaders) \ diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 42ee99cef11..10eb48abb37 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -50,7 +50,7 @@ namespace CurrentMetrics { extern const Metric PSMVCCSnapshotsList; -extern const Metric PSWriterQueueSize; +extern const Metric PSPendingWriterNum; } // namespace CurrentMetrics namespace DB @@ -1481,6 +1481,7 @@ std::unordered_set PageDirectory::apply(PageEntriesEdit && edit, // Note that, as read threads use current `sequence` as read_seq, we cannot increase `sequence` // before applying edit to `mvcc_table_directory`. GET_METRIC(tiflash_storage_page_command_count, type_write).Increment(); + CurrentMetrics::Increment pending_writer_size{CurrentMetrics::PSPendingWriterNum}; Writer w; w.edit = &edit; @@ -1492,7 +1493,6 @@ std::unordered_set PageDirectory::apply(PageEntriesEdit && edit, writers.push_back(&w); SYNC_FOR("after_PageDirectory::enter_write_group"); - CurrentMetrics::set(CurrentMetrics::PSWriterQueueSize, writers.size()); w.cv.wait(apply_lock, [&] { return w.done || &w == writers.front(); }); GET_METRIC(tiflash_storage_page_write_duration_seconds, type_wait_in_group).Observe(watch.elapsedSeconds()); watch.restart(); diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index d7afdb8fb52..02fbc4b018c 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -589,7 +589,7 @@ "shared": true, "sort": 2, "value_type": "cumulative" - }, + }, "type": "graph", "xaxis": { "buckets": null, @@ -597,7 +597,7 @@ "name": null, "show": true, "values": [] - }, + }, "yaxes": [ { "format": "short", @@ -620,8 +620,8 @@ "align": false, "alignLevel": null } - }, - { + }, + { "aliasColors": {}, "bars": false, "dashLength": 10, @@ -633,7 +633,7 @@ "fieldConfig": { "defaults": {}, "overrides": [] - }, + }, "fill": 0, "fillGradient": 0, "grid": {}, @@ -642,7 +642,7 @@ "w": 12, "x": 0, "y": 17 - }, + }, "hiddenSeries": false, "id": 51, "legend": { @@ -658,14 +658,14 @@ "sortDesc": true, "total": false, "values": true - }, + }, "lines": true, "linewidth": 1, "links": [], "nullPointMode": "null", "options": { "alertThreshold": true - }, + }, "percentage": false, "pluginVersion": "7.5.11", "pointradius": 5, @@ -9562,6 +9562,218 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The num of pending writers in PageStorage", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 40 + }, + "hiddenSeries": false, + "id": 231, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": false, + "hideZero": false, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 250, + "sort": "max", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(tiflash_system_current_metric_PSPendingWriterNum{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (instance)", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "size-{{instance}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "PageStorage Pending Writers Num", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 48 + }, + "hiddenSeries": false, + "id": 232, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "yaxis": 2 + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(increase(tiflash_storage_page_command_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance, type)", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{type}}-{{instance}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "PS Command OPS By Instance", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": null, + "format": "ops", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "opm", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "PageStorage", @@ -14859,4 +15071,4 @@ "title": "Test-Cluster-TiFlash-Summary", "uid": "SVbh2xUWk", "version": 1 -} +} \ No newline at end of file From c111936963f8aa79137deb39f49ea888e53d747d Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 9 Jun 2023 16:08:12 +0800 Subject: [PATCH 6/6] Add comment Signed-off-by: JaySon-Huang --- dbms/src/Storages/Page/V3/LogFile/LogReader.h | 4 ---- dbms/src/Storages/Page/V3/PageDirectory.h | 3 ++- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/dbms/src/Storages/Page/V3/LogFile/LogReader.h b/dbms/src/Storages/Page/V3/LogFile/LogReader.h index 617b2ae1629..65808133bec 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogReader.h +++ b/dbms/src/Storages/Page/V3/LogFile/LogReader.h @@ -20,10 +20,6 @@ #include #include -namespace Poco -{ -class Logger; -} namespace DB { class ReadBuffer; diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 45732e66dbc..55834abe4f1 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -462,7 +462,8 @@ class PageDirectory std::condition_variable cv; }; - // return the last writer in the group + // Return the last writer in the group + // All the edit in the write group will be merged into `first->edit`. Writer * buildWriteGroup(Writer * first, std::unique_lock & /*lock*/); private: