Skip to content

Commit

Permalink
remove unnecessary fsync
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Jun 8, 2023
1 parent c90ecbd commit 1607a0e
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 34 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
M(PSMVCCSnapshotsList) \
M(PSMVCCNumDelta) \
M(PSMVCCNumBase) \
M(PSWriterQueueSize) \
M(RWLockWaitingReaders) \
M(RWLockWaitingWriters) \
M(RWLockActiveReaders) \
Expand Down
57 changes: 33 additions & 24 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,33 +161,42 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(

// 2. For entry edits without the checkpoint info, or it is stored on an existing data file that needs compact,
// write the entry data to the data file, and assign a new checkpoint info.
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit);
auto data_location = data_writer->write(
rec_edit.page_id,
rec_edit.version,
page.data.begin(),
page.data.size());
// the page data size uploaded in this checkpoint
write_down_stats.num_bytes[static_cast<size_t>(id_storage_type)] += rec_edit.entry.size;
current_write_size += data_location.size_in_file;
RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size);
bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed;
rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{
.data_location = data_location,
.is_valid = true,
.is_local_data_reclaimed = is_local_data_reclaimed,
};
locked_files.emplace(*data_location.data_file_id);
if (is_compaction)
try
{
write_down_stats.compact_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_compact += 1;
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit);
auto data_location = data_writer->write(
rec_edit.page_id,
rec_edit.version,
page.data.begin(),
page.data.size());
// the page data size uploaded in this checkpoint
write_down_stats.num_bytes[static_cast<size_t>(id_storage_type)] += rec_edit.entry.size;
current_write_size += data_location.size_in_file;
RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size);
bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed;
rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{
.data_location = data_location,
.is_valid = true,
.is_local_data_reclaimed = is_local_data_reclaimed,
};
locked_files.emplace(*data_location.data_file_id);
if (is_compaction)
{
write_down_stats.compact_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_compact += 1;
}
else
{
write_down_stats.incremental_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_incremental += 1;
}
}
else
catch (...)
{
write_down_stats.incremental_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_incremental += 1;
LOG_ERROR(log, "failed to read page, record={}", rec_edit);
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}

Expand Down
18 changes: 10 additions & 8 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <Common/Checksum.h>
#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
Expand Down Expand Up @@ -69,7 +71,7 @@ size_t LogWriter::writtenBytes() const
return written_bytes;
}

void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background)
void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background, bool sync)
{
if (write_buffer.offset() == 0)
{
Expand All @@ -84,7 +86,10 @@ void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background)
/*background=*/background,
/*truncate_if_failed=*/false,
/*enable_failpoint=*/false);
log_file->fsync();
if (sync)
{
log_file->fsync();
}
written_bytes += write_buffer.offset();

// reset the write_buffer
Expand Down Expand Up @@ -115,7 +120,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const
static constexpr char MAX_ZERO_HEADER[Format::RECYCLABLE_HEADER_SIZE]{'\x00'};
if (unlikely(buffer_size - write_buffer.offset() < leftover))
{
flush(write_limiter, background);
flush(write_limiter, background, false);
}
writeString(MAX_ZERO_HEADER, leftover, write_buffer);
block_offset = 0;
Expand All @@ -141,7 +146,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const
// Check available space in write_buffer before writing
if (buffer_size - write_buffer.offset() < fragment_length + header_size)
{
flush(write_limiter, background);
flush(write_limiter, background, false);
}
try
{
Expand All @@ -158,10 +163,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const
begin = false;
} while (payload.hasPendingData());

if (!manual_flush)
{
flush(write_limiter, background);
}
flush(write_limiter, background, !manual_flush);
}

void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload, size_t length)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/LogFile/LogWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class LogWriter final : private Allocator<false>

void addRecord(ReadBuffer & payload, size_t payload_size, const WriteLimiterPtr & write_limiter = nullptr, bool background = false);

void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false);
void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false, bool sync = true);

void close();

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
namespace CurrentMetrics
{
extern const Metric PSMVCCSnapshotsList;
extern const Metric PSWriterQueueSize;
} // namespace CurrentMetrics

namespace DB
Expand Down Expand Up @@ -1484,6 +1485,7 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
watch.restart();

writers.push_back(&w);
CurrentMetrics::set(CurrentMetrics::PSWriterQueueSize, writers.size());
w.cv.wait(apply_lock, [&] { return w.done || &w == writers.front(); });
GET_METRIC(tiflash_storage_page_write_duration_seconds, type_wait_in_group).Observe(watch.elapsedSeconds());
watch.restart();
Expand All @@ -1504,7 +1506,6 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
// group owner, others just return an empty set.
return {};
}

auto * last_writer = buildWriteGroup(&w, apply_lock);
apply_lock.unlock();

Expand Down

0 comments on commit 1607a0e

Please sign in to comment.