Skip to content

Commit

Permalink
Merge branch 'master' into wenxuan/readme-polish2
Browse files Browse the repository at this point in the history
  • Loading branch information
breezewish authored May 19, 2022
2 parents da6f5de + aedee1c commit ba846bf
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 18 deletions.
21 changes: 15 additions & 6 deletions dbms/src/Storages/Page/PageUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ void ftruncateFile(T & file, off_t length)
DB::throwFromErrno(fmt::format("Cannot truncate file: {}. ", file->getFileName()), ErrorCodes::CANNOT_FTRUNCATE);
}


// TODO: split current api into V2 and V3.
// Too many args in this function.
// Also split read
template <typename T>
void writeFile(
T & file,
Expand All @@ -161,6 +163,7 @@ void writeFile(
size_t to_write,
const WriteLimiterPtr & write_limiter = nullptr,
const bool background = false,
const bool truncate_if_failed = true,
[[maybe_unused]] bool enable_failpoint = false)
{
if (write_limiter)
Expand Down Expand Up @@ -192,15 +195,21 @@ void writeFile(
{
ProfileEvents::increment(ProfileEvents::PSMWriteFailed);
auto saved_errno = errno;
// If error occurs, apply `ftruncate` try to truncate the broken bytes we have written.
// Note that the result of this ftruncate is ignored, there is nothing we can do to
// handle ftruncate error. The errno may change after ftruncate called.
int truncate_res = ::ftruncate(file->getFd(), offset);

int truncate_res = 0;
// If write failed in V3, Don't do truncate
if (truncate_if_failed)
{
// If error occurs, apply `ftruncate` try to truncate the broken bytes we have written.
// Note that the result of this ftruncate is ignored, there is nothing we can do to
// handle ftruncate error. The errno may change after ftruncate called.
truncate_res = ::ftruncate(file->getFd(), offset);
}

DB::throwFromErrno(fmt::format("Cannot write to file {},[truncate_res = {}],[errno_after_truncate = {}],"
"[bytes_written={},to_write={},offset = {}]",
file->getFileName(),
truncate_res,
truncate_if_failed ? DB::toString(truncate_res) : "no need truncate",
strerror(errno),
bytes_written,
to_write,
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/Page/V2/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ bool PageFile::LinkingMetaAdapter::linkToNewSequenceNext(WriteBatch::SequenceID

if (binary_version == PageFormat::V2)
{
const UInt64 num_fields = PageUtil::get<UInt64>(pos);
const auto num_fields = PageUtil::get<UInt64>(pos);
entry.field_offsets.reserve(num_fields);
for (size_t i = 0; i < num_fields; ++i)
{
Expand Down Expand Up @@ -649,7 +649,7 @@ void PageFile::MetaMergingReader::moveNext(PageFormat::Version * v)

if (binary_version == PageFormat::V2)
{
const UInt64 num_fields = PageUtil::get<UInt64>(pos);
const auto num_fields = PageUtil::get<UInt64>(pos);
entry.field_offsets.reserve(num_fields);
for (size_t i = 0; i < num_fields; ++i)
{
Expand Down Expand Up @@ -792,7 +792,15 @@ size_t PageFile::Writer::write(DB::WriteBatch & wb, PageEntriesEdit & edit, cons
SCOPE_EXIT({ page_file.free(data_buf.begin(), data_buf.size()); });

auto write_buf = [&](WritableFilePtr & file, UInt64 offset, ByteBuffer buf, bool enable_failpoint) {
PageUtil::writeFile(file, offset, buf.begin(), buf.size(), write_limiter, background, enable_failpoint);
PageUtil::writeFile(
file,
offset,
buf.begin(),
buf.size(),
write_limiter,
background,
/*truncate_if_failed=*/true,
/*enable_failpoint=*/enable_failpoint);
if (sync_on_write)
PageUtil::syncFile(file);
};
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/Page/V2/tests/gtest_page_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace tests
{
static const std::string FileName = "page_util_test";

TEST(PageUtils_test, ReadWriteFile)
TEST(PageUtilsTest, ReadWriteFile)
{
::remove(FileName.c_str());

Expand All @@ -41,7 +41,7 @@ TEST(PageUtils_test, ReadWriteFile)
buff_write[i] = i % 0xFF;
}
WritableFilePtr file_for_write = std::make_shared<PosixWritableFile>(FileName, true, -1, 0666);
PageUtil::writeFile(file_for_write, 0, buff_write, buff_size, /*write_limiter*/ nullptr, /*background*/ false, /*enable_failpoint*/ true);
PageUtil::writeFile(file_for_write, 0, buff_write, buff_size, /*write_limiter*/ nullptr, /*background*/ false, /*truncate_if_failed*/ true, /*enable_failpoint*/ true);
PageUtil::syncFile(file_for_write);
file_for_write->close();

Expand All @@ -53,15 +53,15 @@ TEST(PageUtils_test, ReadWriteFile)
::remove(FileName.c_str());
}

TEST(PageUtils_test, FileNotExists)
TEST(PageUtilsTest, FileNotExists)
{
::remove(FileName.c_str());

int fd = PageUtil::openFile<true, false>(FileName);
ASSERT_EQ(fd, 0);
}

TEST(PageUtils_test, BigReadWriteFile)
TEST(PageUtilsTest, BigReadWriteFile)
{
::remove(FileName.c_str());

Expand All @@ -78,7 +78,7 @@ TEST(PageUtils_test, BigReadWriteFile)
buff_write[i] = i % 0xFF;
}

PageUtil::writeFile(file_for_write, 0, buff_write, buff_size, nullptr, /*background*/ false, /*enable_failpoint*/ false);
PageUtil::writeFile(file_for_write, 0, buff_write, buff_size, nullptr, /*background*/ false, /*truncate_if_failed*/ true, /*enable_failpoint*/ false);
PageUtil::syncFile(file_for_write);
file_for_write->close();

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/BlobFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ void BlobFile::write(char * buffer, size_t offset, size_t size, const WriteLimit
});

#ifndef NDEBUG
PageUtil::writeFile(wrfile, offset, buffer, size, write_limiter, background, true);
PageUtil::writeFile(wrfile, offset, buffer, size, write_limiter, background, /*truncate_if_failed=*/false, /*enable_failpoint=*/true);
#else
PageUtil::writeFile(wrfile, offset, buffer, size, write_limiter, background, false);
PageUtil::writeFile(wrfile, offset, buffer, size, write_limiter, background, /*truncate_if_failed=*/false, /*enable_failpoint=*/false);
#endif
PageUtil::syncFile(wrfile);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
catch (DB::Exception & e)
{
removePosFromStats(blob_id, offset_in_file, actually_allocated_size);
LOG_FMT_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] [actually_allocated_size={}] write failed.", blob_id, offset_in_file, all_page_data_size, actually_allocated_size);
LOG_FMT_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] [actually_allocated_size={}] write failed [error={}]", blob_id, offset_in_file, all_page_data_size, actually_allocated_size, e.message());
throw e;
}

Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ size_t LogWriter::writtenBytes() const

void LogWriter::flush(const WriteLimiterPtr & write_limiter, const bool background)
{
PageUtil::writeFile(log_file, written_bytes, write_buffer.buffer().begin(), write_buffer.offset(), write_limiter, /*background*/ background, /*enable_failpoint*/ false);
PageUtil::writeFile(log_file,
written_bytes,
write_buffer.buffer().begin(),
write_buffer.offset(),
write_limiter,
/*background=*/background,
/*truncate_if_failed=*/false,
/*enable_failpoint=*/false);
log_file->fsync();
written_bytes += write_buffer.offset();

Expand Down

0 comments on commit ba846bf

Please sign in to comment.