diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index 1c302b6e231..66511160c15 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -28,6 +28,7 @@ M(PSMVCCSnapshotsList) \ M(PSMVCCNumDelta) \ M(PSMVCCNumBase) \ + M(PSWriterQueueSize) \ M(RWLockWaitingReaders) \ M(RWLockWaitingWriters) \ M(RWLockActiveReaders) \ diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp index 886011a6e13..d6a75f63bcc 100644 --- a/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp @@ -161,33 +161,42 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo( // 2. For entry edits without the checkpoint info, or it is stored on an existing data file that needs compact, // write the entry data to the data file, and assign a new checkpoint info. - auto page = data_source->read({rec_edit.page_id, rec_edit.entry}); - RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit); - auto data_location = data_writer->write( - rec_edit.page_id, - rec_edit.version, - page.data.begin(), - page.data.size()); - // the page data size uploaded in this checkpoint - write_down_stats.num_bytes[static_cast(id_storage_type)] += rec_edit.entry.size; - current_write_size += data_location.size_in_file; - RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size); - bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed; - rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{ - .data_location = data_location, - .is_valid = true, - .is_local_data_reclaimed = is_local_data_reclaimed, - }; - locked_files.emplace(*data_location.data_file_id); - if (is_compaction) + try { - write_down_stats.compact_data_bytes += rec_edit.entry.size; - write_down_stats.num_pages_compact += 1; + auto page = data_source->read({rec_edit.page_id, rec_edit.entry}); + RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit); + auto data_location = data_writer->write( + rec_edit.page_id, + rec_edit.version, + page.data.begin(), + page.data.size()); + // the page data size uploaded in this checkpoint + write_down_stats.num_bytes[static_cast(id_storage_type)] += rec_edit.entry.size; + current_write_size += data_location.size_in_file; + RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size); + bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed; + rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{ + .data_location = data_location, + .is_valid = true, + .is_local_data_reclaimed = is_local_data_reclaimed, + }; + locked_files.emplace(*data_location.data_file_id); + if (is_compaction) + { + write_down_stats.compact_data_bytes += rec_edit.entry.size; + write_down_stats.num_pages_compact += 1; + } + else + { + write_down_stats.incremental_data_bytes += rec_edit.entry.size; + write_down_stats.num_pages_incremental += 1; + } } - else + catch (...) { - write_down_stats.incremental_data_bytes += rec_edit.entry.size; - write_down_stats.num_pages_incremental += 1; + LOG_ERROR(log, "failed to read page, record={}", rec_edit); + tryLogCurrentException(__PRETTY_FUNCTION__); + throw; } } diff --git a/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp b/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp index 48442526ce2..8c171699660 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp +++ b/dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include #include #include @@ -69,7 +71,7 @@ size_t LogWriter::writtenBytes() const return written_bytes; } -void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background) +void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background, bool sync) { if (write_buffer.offset() == 0) { @@ -84,7 +86,10 @@ void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background) /*background=*/background, /*truncate_if_failed=*/false, /*enable_failpoint=*/false); - log_file->fsync(); + if (sync) + { + log_file->fsync(); + } written_bytes += write_buffer.offset(); // reset the write_buffer @@ -115,7 +120,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const static constexpr char MAX_ZERO_HEADER[Format::RECYCLABLE_HEADER_SIZE]{'\x00'}; if (unlikely(buffer_size - write_buffer.offset() < leftover)) { - flush(write_limiter, background); + flush(write_limiter, background, false); } writeString(MAX_ZERO_HEADER, leftover, write_buffer); block_offset = 0; @@ -141,7 +146,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const // Check available space in write_buffer before writing if (buffer_size - write_buffer.offset() < fragment_length + header_size) { - flush(write_limiter, background); + flush(write_limiter, background, false); } try { @@ -158,10 +163,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const begin = false; } while (payload.hasPendingData()); - if (!manual_flush) - { - flush(write_limiter, background); - } + flush(write_limiter, background, !manual_flush); } void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload, size_t length) diff --git a/dbms/src/Storages/Page/V3/LogFile/LogWriter.h b/dbms/src/Storages/Page/V3/LogFile/LogWriter.h index 8b1b791b67c..9afc5ee655a 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogWriter.h +++ b/dbms/src/Storages/Page/V3/LogFile/LogWriter.h @@ -87,7 +87,7 @@ class LogWriter final : private Allocator void addRecord(ReadBuffer & payload, size_t payload_size, const WriteLimiterPtr & write_limiter = nullptr, bool background = false); - void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false); + void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false, bool sync = true); void close(); diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index e998b7e0108..aa03320ee25 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -50,6 +50,7 @@ namespace CurrentMetrics { extern const Metric PSMVCCSnapshotsList; +extern const Metric PSWriterQueueSize; } // namespace CurrentMetrics namespace DB @@ -1484,6 +1485,7 @@ std::unordered_set PageDirectory::apply(PageEntriesEdit && edit, watch.restart(); writers.push_back(&w); + CurrentMetrics::set(CurrentMetrics::PSWriterQueueSize, writers.size()); w.cv.wait(apply_lock, [&] { return w.done || &w == writers.front(); }); GET_METRIC(tiflash_storage_page_write_duration_seconds, type_wait_in_group).Observe(watch.elapsedSeconds()); watch.restart(); @@ -1504,7 +1506,6 @@ std::unordered_set PageDirectory::apply(PageEntriesEdit && edit, // group owner, others just return an empty set. return {}; } - auto * last_writer = buildWriteGroup(&w, apply_lock); apply_lock.unlock();