Skip to content

Commit

Permalink
small refactor LogWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Jun 8, 2023
1 parent 1607a0e commit f77342d
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 42 deletions.
62 changes: 33 additions & 29 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
#include <Common/Checksum.h>
#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
Expand All @@ -33,13 +31,13 @@ LogWriter::LogWriter(
const FileProviderPtr & file_provider_,
Format::LogNumberType log_number_,
bool recycle_log_files_,
bool manual_flush_)
bool manual_sync_)
: path(path_)
, file_provider(file_provider_)
, block_offset(0)
, log_number(log_number_)
, recycle_log_files(recycle_log_files_)
, manual_flush(manual_flush_)
, manual_sync(manual_sync_)
, write_buffer(nullptr, 0)
{
log_file = file_provider->newWritableFile(
Expand Down Expand Up @@ -71,29 +69,9 @@ size_t LogWriter::writtenBytes() const
return written_bytes;
}

void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background, bool sync)
void LogWriter::sync()
{
if (write_buffer.offset() == 0)
{
return;
}

PageUtil::writeFile(log_file,
written_bytes,
write_buffer.buffer().begin(),
write_buffer.offset(),
write_limiter,
/*background=*/background,
/*truncate_if_failed=*/false,
/*enable_failpoint=*/false);
if (sync)
{
log_file->fsync();
}
written_bytes += write_buffer.offset();

// reset the write_buffer
resetBuffer();
log_file->fsync();
}

void LogWriter::close()
Expand All @@ -120,7 +98,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const
static constexpr char MAX_ZERO_HEADER[Format::RECYCLABLE_HEADER_SIZE]{'\x00'};
if (unlikely(buffer_size - write_buffer.offset() < leftover))
{
flush(write_limiter, background, false);
flush(write_limiter, background);
}
writeString(MAX_ZERO_HEADER, leftover, write_buffer);
block_offset = 0;
Expand All @@ -146,7 +124,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const
// Check available space in write_buffer before writing
if (buffer_size - write_buffer.offset() < fragment_length + header_size)
{
flush(write_limiter, background, false);
flush(write_limiter, background);
}
try
{
Expand All @@ -163,7 +141,11 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const
begin = false;
} while (payload.hasPendingData());

flush(write_limiter, background, !manual_flush);
flush(write_limiter, background);
if (!manual_sync)
{
sync();
}
}

void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload, size_t length)
Expand Down Expand Up @@ -218,4 +200,26 @@ void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload

block_offset += header_size + length;
}

void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background)
{
if (write_buffer.offset() == 0)
{
return;
}

PageUtil::writeFile(log_file,
written_bytes,
write_buffer.buffer().begin(),
write_buffer.offset(),
write_limiter,
/*background=*/background,
/*truncate_if_failed=*/false,
/*enable_failpoint=*/false);

written_bytes += write_buffer.offset();

// reset the write_buffer
resetBuffer();
}
} // namespace DB::PS::V3
11 changes: 6 additions & 5 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ class LogWriter final : private Allocator<false>
const FileProviderPtr & file_provider_,
Format::LogNumberType log_number_,
bool recycle_log_files_,
bool manual_flush_ = false);
bool manual_sync_ = false);

DISALLOW_COPY(LogWriter);

~LogWriter();

void addRecord(ReadBuffer & payload, size_t payload_size, const WriteLimiterPtr & write_limiter = nullptr, bool background = false);

void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false, bool sync = true);
void sync();

void close();

Expand All @@ -103,6 +103,8 @@ class LogWriter final : private Allocator<false>

void resetBuffer();

void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false);

private:
String path;
FileProviderPtr file_provider;
Expand All @@ -112,9 +114,8 @@ class LogWriter final : private Allocator<false>
size_t block_offset; // Current offset in block
Format::LogNumberType log_number;
const bool recycle_log_files;
// If true, it does not flush after each write. Instead it relies on the upper
// layer to manually does the flush by calling ::flush()
const bool manual_flush;
// If true, the upper layer need manually sync the log file after write by calling LogWriter::sync()
const bool manual_sync;

size_t written_bytes = 0;

Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Format::LogNumberType WALStore::rollToNewLogWriter(const std::lock_guard<std::mu

std::tuple<std::unique_ptr<LogWriter>, LogFilename> WALStore::createLogWriter(
const std::pair<Format::LogNumberType, Format::LogNumberType> & new_log_lvl,
bool manual_flush)
bool temp_file)
{
String path;

Expand All @@ -130,7 +130,7 @@ std::tuple<std::unique_ptr<LogWriter>, LogFilename> WALStore::createLogWriter(
path += wal_folder_prefix;

LogFilename log_filename = LogFilename{
(manual_flush ? LogFileStage::Temporary : LogFileStage::Normal),
(temp_file ? LogFileStage::Temporary : LogFileStage::Normal),
new_log_lvl.first,
new_log_lvl.second,
0,
Expand All @@ -139,12 +139,13 @@ std::tuple<std::unique_ptr<LogWriter>, LogFilename> WALStore::createLogWriter(
auto fullname = log_filename.fullname(log_filename.stage);
// TODO check whether the file already existed
LOG_INFO(logger, "Creating log file for writing [fullname={}]", fullname);
// if it is a temp file, we will manually sync it after writing snapshot
auto log_writer = std::make_unique<LogWriter>(
fullname,
provider,
new_log_lvl.first,
/*recycle*/ true,
/*manual_flush*/ manual_flush);
/*manual_sync*/ temp_file);
return {std::move(log_writer), log_filename};
}

Expand Down Expand Up @@ -218,13 +219,13 @@ bool WALStore::saveSnapshot(
// Use {largest_log_num, 1} to save the `edit`
const auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
// Create a temporary file for saving directory snapshot
auto [compact_log, log_filename] = createLogWriter({log_num, 1}, /*manual_flush*/ true);
auto [compact_log, log_filename] = createLogWriter({log_num, 1}, /*temp_file*/ true);

// TODO: split the snap into multiple records in LogFile so that the memory
// consumption could be more smooth.
ReadBufferFromString payload(serialized_snap);
compact_log->addRecord(payload, serialized_snap.size(), write_limiter, /*background*/ true);
compact_log->flush(write_limiter, /*background*/ true);
compact_log->sync();
compact_log.reset(); // close fd explicitly before renaming file.

// Rename it to be a normal log file.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/WALStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class WALStore
std::tuple<std::unique_ptr<LogWriter>, LogFilename>
createLogWriter(
const std::pair<Format::LogNumberType, Format::LogNumberType> & new_log_lvl,
bool manual_flush);
bool temp_file);

Format::LogNumberType rollToNewLogWriter(const std::lock_guard<std::mutex> &);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ INSTANTIATE_TEST_CASE_P(
return fmt::format("{}_{}", recycle_log, allow_retry_read);
});

TEST(LogFileRWTest2, ManuallyFlush)
TEST(LogFileRWTest2, ManuallySync)
{
auto provider = TiFlashTestEnv::getDefaultFileProvider();
auto path = TiFlashTestEnv::getTemporaryPath("LogFileRWTest2");
Expand All @@ -835,7 +835,7 @@ TEST(LogFileRWTest2, ManuallyFlush)
ReadBufferFromString buff(payload);
ASSERT_NO_THROW(writer->addRecord(buff, payload.size()));
}
writer->flush();
writer->sync();

auto read_buf = createReadBufferFromFileBaseByFileProvider(
provider,
Expand Down

0 comments on commit f77342d

Please sign in to comment.