Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid unnecessary fsync in ps #7616

Merged
merged 8 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
M(PSMVCCSnapshotsList) \
M(PSMVCCNumDelta) \
M(PSMVCCNumBase) \
M(PSPendingWriterNum) \
M(RWLockWaitingReaders) \
M(RWLockWaitingWriters) \
M(RWLockActiveReaders) \
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ namespace DB
F(type_fullgc_commit, {{"type", "fullgc_commit"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_clean_external, {{"type", "clean_external"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_v3, {{"type", "v3"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_storage_page_command_count, "Total number of PageStorage's command, such as write / read / scan / snapshot", Counter, \
F(type_write, {"type", "write"}), F(type_read, {"type", "read"}), \
F(type_scan, {"type", "scan"}), F(type_snapshot, {"type", "snapshot"})) \
M(tiflash_storage_page_write_batch_size, "The size of each write batch in bytes", Histogram, \
F(type_v3, {{"type", "v3"}}, ExpBuckets{4 * 1024, 4, 10})) \
M(tiflash_storage_page_write_duration_seconds, "The duration of each write batch", Histogram, \
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,7 @@ template <typename Trait>
typename BlobStore<Trait>::PageMap
BlobStore<Trait>::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_limiter)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
if (entries.empty())
{
return {};
Expand Down Expand Up @@ -985,6 +986,7 @@ BlobStore<Trait>::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_l
template <typename Trait>
Page BlobStore<Trait>::read(const PageIdAndEntry & id_entry, const ReadLimiterPtr & read_limiter)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
const auto & [page_id_v3, entry] = id_entry;
const size_t buf_size = entry.size;

Expand Down Expand Up @@ -1044,6 +1046,7 @@ Page BlobStore<Trait>::read(const PageIdAndEntry & id_entry, const ReadLimiterPt
template <typename Trait>
BlobFilePtr BlobStore<Trait>::read(const typename BlobStore<Trait>::PageId & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
assert(buffers != nullptr);
BlobFilePtr blob_file = getBlobFile(blob_id);
try
Expand Down
57 changes: 33 additions & 24 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,33 +161,42 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(

// 2. For entry edits without the checkpoint info, or it is stored on an existing data file that needs compact,
// write the entry data to the data file, and assign a new checkpoint info.
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit);
auto data_location = data_writer->write(
rec_edit.page_id,
rec_edit.version,
page.data.begin(),
page.data.size());
// the page data size uploaded in this checkpoint
write_down_stats.num_bytes[static_cast<size_t>(id_storage_type)] += rec_edit.entry.size;
current_write_size += data_location.size_in_file;
RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size);
bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed;
rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{
.data_location = data_location,
.is_valid = true,
.is_local_data_reclaimed = is_local_data_reclaimed,
};
locked_files.emplace(*data_location.data_file_id);
if (is_compaction)
try
{
write_down_stats.compact_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_compact += 1;
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit);
auto data_location = data_writer->write(
rec_edit.page_id,
rec_edit.version,
page.data.begin(),
page.data.size());
// the page data size uploaded in this checkpoint
write_down_stats.num_bytes[static_cast<size_t>(id_storage_type)] += rec_edit.entry.size;
current_write_size += data_location.size_in_file;
RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size);
bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed;
rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{
.data_location = data_location,
.is_valid = true,
.is_local_data_reclaimed = is_local_data_reclaimed,
};
locked_files.emplace(*data_location.data_file_id);
if (is_compaction)
{
write_down_stats.compact_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_compact += 1;
}
else
{
write_down_stats.incremental_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_incremental += 1;
}
}
else
catch (...)
{
write_down_stats.incremental_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_incremental += 1;
LOG_ERROR(log, "failed to read page, record={}", rec_edit);
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}

Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Storages/Page/V3/LogFile/LogReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
#include <Storages/Page/V3/WALStore.h>
#include <common/types.h>

namespace Poco
{
class Logger;
}
namespace DB
{
class ReadBuffer;
Expand Down
50 changes: 28 additions & 22 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ LogWriter::LogWriter(
const FileProviderPtr & file_provider_,
Format::LogNumberType log_number_,
bool recycle_log_files_,
bool manual_flush_)
bool manual_sync_)
: path(path_)
, file_provider(file_provider_)
, block_offset(0)
, log_number(log_number_)
, recycle_log_files(recycle_log_files_)
, manual_flush(manual_flush_)
, manual_sync(manual_sync_)
, write_buffer(nullptr, 0)
{
log_file = file_provider->newWritableFile(
Expand Down Expand Up @@ -69,26 +69,9 @@ size_t LogWriter::writtenBytes() const
return written_bytes;
}

void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background)
void LogWriter::sync()
{
if (write_buffer.offset() == 0)
{
return;
}

PageUtil::writeFile(log_file,
written_bytes,
write_buffer.buffer().begin(),
write_buffer.offset(),
write_limiter,
/*background=*/background,
/*truncate_if_failed=*/false,
/*enable_failpoint=*/false);
log_file->fsync();
written_bytes += write_buffer.offset();

// reset the write_buffer
resetBuffer();
}

void LogWriter::close()
Expand Down Expand Up @@ -158,9 +141,10 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const
begin = false;
} while (payload.hasPendingData());

if (!manual_flush)
flush(write_limiter, background);
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
if (!manual_sync)
{
flush(write_limiter, background);
sync();
}
}

Expand Down Expand Up @@ -216,4 +200,26 @@ void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload

block_offset += header_size + length;
}

void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background)
{
if (write_buffer.offset() == 0)
{
return;
}

PageUtil::writeFile(log_file,
written_bytes,
write_buffer.buffer().begin(),
write_buffer.offset(),
write_limiter,
/*background=*/background,
/*truncate_if_failed=*/false,
/*enable_failpoint=*/false);

written_bytes += write_buffer.offset();

// reset the write_buffer
resetBuffer();
}
} // namespace DB::PS::V3
11 changes: 6 additions & 5 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ class LogWriter final : private Allocator<false>
const FileProviderPtr & file_provider_,
Format::LogNumberType log_number_,
bool recycle_log_files_,
bool manual_flush_ = false);
bool manual_sync_ = false);

DISALLOW_COPY(LogWriter);

~LogWriter();

void addRecord(ReadBuffer & payload, size_t payload_size, const WriteLimiterPtr & write_limiter = nullptr, bool background = false);

void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false);
void sync();

void close();

Expand All @@ -103,6 +103,8 @@ class LogWriter final : private Allocator<false>

void resetBuffer();

void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false);

private:
String path;
FileProviderPtr file_provider;
Expand All @@ -112,9 +114,8 @@ class LogWriter final : private Allocator<false>
size_t block_offset; // Current offset in block
Format::LogNumberType log_number;
const bool recycle_log_files;
// If true, it does not flush after each write. Instead it relies on the upper
// layer to manually does the flush by calling ::flush()
const bool manual_flush;
// If true, the upper layer need manually sync the log file after write by calling LogWriter::sync()
const bool manual_sync;

size_t written_bytes = 0;

Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
namespace CurrentMetrics
{
extern const Metric PSMVCCSnapshotsList;
extern const Metric PSPendingWriterNum;
} // namespace CurrentMetrics

namespace DB
Expand Down Expand Up @@ -964,6 +965,7 @@ PageDirectory<Trait>::PageDirectory(String storage_name, WALStorePtr && wal_, UI
template <typename Trait>
PageDirectorySnapshotPtr PageDirectory<Trait>::createSnapshot(const String & tracing_id) const
{
GET_METRIC(tiflash_storage_page_command_count, type_snapshot).Increment();
auto snap = std::make_shared<PageDirectorySnapshot>(sequence.load(), tracing_id);
{
std::lock_guard snapshots_lock(snapshots_mutex);
Expand Down Expand Up @@ -1018,6 +1020,7 @@ SnapshotsStatistics PageDirectory<Trait>::getSnapshotsStat() const
template <typename Trait>
typename PageDirectory<Trait>::PageIdAndEntry PageDirectory<Trait>::getByIDImpl(const PageId & page_id, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
PageEntryV3 entry_got;

// After two write batches applied: [ver=1]{put 10}, [ver=2]{ref 11->10, del 10}, the `mvcc_table_directory` is:
Expand Down Expand Up @@ -1112,6 +1115,7 @@ template <typename Trait>
std::pair<typename PageDirectory<Trait>::PageIdAndEntries, typename PageDirectory<Trait>::PageIds>
PageDirectory<Trait>::getByIDsImpl(const typename PageDirectory<Trait>::PageIds & page_ids, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
PageEntryV3 entry_got;
PageIds page_not_found = {};

Expand Down Expand Up @@ -1257,6 +1261,7 @@ UInt64 PageDirectory<Trait>::getMaxIdAfterRestart() const
template <typename Trait>
typename PageDirectory<Trait>::PageIdSet PageDirectory<Trait>::getAllPageIds()
{
GET_METRIC(tiflash_storage_page_command_count, type_scan).Increment();
std::set<PageId> page_ids;

std::shared_lock read_lock(table_rw_mutex);
Expand All @@ -1273,6 +1278,7 @@ typename PageDirectory<Trait>::PageIdSet PageDirectory<Trait>::getAllPageIds()
template <typename Trait>
typename PageDirectory<Trait>::PageIdSet PageDirectory<Trait>::getAllPageIdsWithPrefix(const String & prefix, const DB::PageStorageSnapshotPtr & snap_)
{
GET_METRIC(tiflash_storage_page_command_count, type_scan).Increment();
if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
PageIdSet page_ids;
Expand All @@ -1299,6 +1305,7 @@ typename PageDirectory<Trait>::PageIdSet PageDirectory<Trait>::getAllPageIdsWith
template <typename Trait>
typename PageDirectory<Trait>::PageIdSet PageDirectory<Trait>::getAllPageIdsInRange(const PageId & start, const PageId & end, const DB::PageStorageSnapshotPtr & snap_)
{
GET_METRIC(tiflash_storage_page_command_count, type_scan).Increment();
if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
PageIdSet page_ids;
Expand Down Expand Up @@ -1473,7 +1480,8 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
// We need to make sure there is only one apply thread to write wal and then increase `sequence`.
// Note that, as read threads use current `sequence` as read_seq, we cannot increase `sequence`
// before applying edit to `mvcc_table_directory`.

GET_METRIC(tiflash_storage_page_command_count, type_write).Increment();
CurrentMetrics::Increment pending_writer_size{CurrentMetrics::PSPendingWriterNum};
Writer w;
w.edit = &edit;

Expand All @@ -1484,6 +1492,7 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
watch.restart();

writers.push_back(&w);
SYNC_FOR("after_PageDirectory::enter_write_group");
w.cv.wait(apply_lock, [&] { return w.done || &w == writers.front(); });
GET_METRIC(tiflash_storage_page_write_duration_seconds, type_wait_in_group).Observe(watch.elapsedSeconds());
watch.restart();
Expand All @@ -1504,9 +1513,9 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
// group owner, others just return an empty set.
return {};
}

auto * last_writer = buildWriteGroup(&w, apply_lock);
apply_lock.unlock();
SYNC_FOR("before_PageDirectory::leader_apply");

// `true` means the write process has completed without exception
bool success = false;
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,13 @@ class PageDirectory
return u;
}

// `writers` should be used under the protection of apply_mutex
// So don't use this function in production code
size_t getWritersQueueSizeForTest()
{
return writers.size();
}

// No copying and no moving
DISALLOW_COPY_AND_MOVE(PageDirectory);

Expand Down Expand Up @@ -455,7 +462,8 @@ class PageDirectory
std::condition_variable cv;
};

// return the last writer in the group
// Return the last writer in the group
// All the edit in the write group will be merged into `first->edit`.
Writer * buildWriteGroup(Writer * first, std::unique_lock<std::mutex> & /*lock*/);

private:
Expand Down
Loading