Skip to content

Commit

Permalink
This is an automated cherry-pick of #5094
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
JaySon-Huang committed Aug 18, 2022
1 parent e7deb68 commit c5b66bd
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 25 deletions.
5 changes: 4 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <Storages/Page/V2/VersionSet/PageEntriesVersionSetWithDelta.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/TMTContext.h>
#include <common/logger_useful.h>

#include <atomic>
#include <ext/scope_guard.h>
Expand Down Expand Up @@ -1144,9 +1145,11 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
}

fiu_do_on(FailPoints::force_slow_page_storage_snapshot_release, {
std::thread thread_hold_snapshots([tasks]() {
std::thread thread_hold_snapshots([this, tasks]() {
LOG_FMT_WARNING(log, "failpoint force_slow_page_storage_snapshot_release begin");
std::this_thread::sleep_for(std::chrono::seconds(5 * 60));
(void)tasks;
LOG_FMT_WARNING(log, "failpoint force_slow_page_storage_snapshot_release end");
});
thread_hold_snapshots.detach();
});
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/PageUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ void readFile(T & file,
}

if (unlikely(bytes_read != expected_bytes))
throw DB::TiFlashException(fmt::format("No enough data in file {}, read bytes: {} , expected bytes: {}", file->getFileName(), bytes_read, expected_bytes),
throw DB::TiFlashException(fmt::format("No enough data in file {}, read bytes: {}, expected bytes: {}, offset: {}", file->getFileName(), bytes_read, expected_bytes, offset),
Errors::PageStorage::FileSizeNotMatch);
}

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V2/tests/gtest_page_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Poco/Logger.h>
#include <Storages/Page/PageUtil.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <common/logger_useful.h>

namespace DB
{
Expand All @@ -30,6 +31,7 @@ namespace tests
static const std::string FileName = "page_util_test";

TEST(PageUtilsTest, ReadWriteFile)
try
{
::remove(FileName.c_str());

Expand All @@ -52,6 +54,7 @@ TEST(PageUtilsTest, ReadWriteFile)

::remove(FileName.c_str());
}
CATCH

TEST(PageUtilsTest, FileNotExists)
{
Expand Down
43 changes: 24 additions & 19 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <Common/Checksum.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/Logger.h>
#include <Common/ProfileEvents.h>
#include <Common/StringUtils/StringUtils.h>
Expand Down Expand Up @@ -560,7 +562,7 @@ void BlobStore::read(PageIDAndEntriesV3 & entries, const PageHandler & handler,

for (const auto & [page_id_v3, entry] : entries)
{
auto blob_file = read(entry.file_id, entry.offset, data_buf, entry.size, read_limiter);
auto blob_file = read(page_id_v3, entry.file_id, entry.offset, data_buf, entry.size, read_limiter);

if constexpr (BLOBSTORE_CHECKSUM_ON_READ)
{
Expand Down Expand Up @@ -640,7 +642,7 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li
// TODO: Continuously fields can read by one system call.
const auto [beg_offset, end_offset] = entry.getFieldOffsets(field_index);
const auto size_to_read = end_offset - beg_offset;
auto blob_file = read(entry.file_id, entry.offset + beg_offset, write_offset, size_to_read, read_limiter);
auto blob_file = read(page_id_v3, entry.file_id, entry.offset + beg_offset, write_offset, size_to_read, read_limiter);
fields_offset_in_page.emplace(field_index, read_size_this_entry);

if constexpr (BLOBSTORE_CHECKSUM_ON_READ)
Expand Down Expand Up @@ -737,7 +739,7 @@ PageMap BlobStore::read(PageIDAndEntriesV3 & entries, const ReadLimiterPtr & rea
PageMap page_map;
for (const auto & [page_id_v3, entry] : entries)
{
auto blob_file = read(entry.file_id, entry.offset, pos, entry.size, read_limiter);
auto blob_file = read(page_id_v3, entry.file_id, entry.offset, pos, entry.size, read_limiter);

if constexpr (BLOBSTORE_CHECKSUM_ON_READ)
{
Expand Down Expand Up @@ -802,7 +804,7 @@ Page BlobStore::read(const PageIDAndEntryV3 & id_entry, const ReadLimiterPtr & r
free(p, buf_size);
});

auto blob_file = read(entry.file_id, entry.offset, data_buf, buf_size, read_limiter);
auto blob_file = read(page_id_v3, entry.file_id, entry.offset, data_buf, buf_size, read_limiter);
if constexpr (BLOBSTORE_CHECKSUM_ON_READ)
{
ChecksumClass digest;
Expand All @@ -829,11 +831,20 @@ Page BlobStore::read(const PageIDAndEntryV3 & id_entry, const ReadLimiterPtr & r
return page;
}

BlobFilePtr BlobStore::read(BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background)
BlobFilePtr BlobStore::read(const PageIdV3Internal & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background)
{
assert(buffers != nullptr);
auto blob_file = getBlobFile(blob_id);
blob_file->read(buffers, offset, size, read_limiter, background);
BlobFilePtr blob_file = getBlobFile(blob_id);
try
{
blob_file->read(buffers, offset, size, read_limiter, background);
}
catch (DB::Exception & e)
{
// add debug message
e.addMessage(fmt::format("(error while reading page data [page_id={}] [blob_id={}] [offset={}] [size={}] [background={}])", page_id_v3, blob_id, offset, size, background));
e.rethrow();
}
return blob_file;
}

Expand Down Expand Up @@ -1142,21 +1153,15 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
std::tie(blobfile_id, file_offset_begin) = getPosFromStats(next_alloc_size);
}

PageEntryV3 new_entry;

read(file_id, entry.offset, data_pos, entry.size, read_limiter, /*background*/ true);

// No need do crc again, crc won't be changed.
new_entry.checksum = entry.checksum;

// Need copy the field_offsets
new_entry.field_offsets = entry.field_offsets;

// Entry size won't be changed.
new_entry.size = entry.size;
// Read the data into buffer by old entry
read(page_id, file_id, entry.offset, data_pos, entry.size, read_limiter, /*background*/ true);

// Most vars of the entry is not changed, but the file id and offset
// need to be updated.
PageEntryV3 new_entry = entry;
new_entry.file_id = blobfile_id;
new_entry.offset = file_offset_begin + offset_in_data;
new_entry.padded_size = 0; // reset padded size to be zero

offset_in_data += new_entry.size;
data_pos += new_entry.size;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class BlobStore : private Allocator<false>

PageEntriesEdit handleLargeWrite(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr);

BlobFilePtr read(BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter = nullptr, bool background = false);
BlobFilePtr read(const PageIdV3Internal & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter = nullptr, bool background = false);

/**
* Ask BlobStats to get a span from BlobStat.
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ TEST_F(BlobStoreTest, testWriteRead)
ASSERT_EQ(record.entry.file_id, 1);

// Read directly from the file
blob_store.read(record.entry.file_id,
blob_store.read(buildV3Id(TEST_NAMESPACE_ID, page_id),
record.entry.file_id,
record.entry.offset,
c_buff_read + index * buff_size,
record.entry.size,
Expand Down Expand Up @@ -681,7 +682,8 @@ TEST_F(BlobStoreTest, testWriteReadWithIOLimiter)
{
for (const auto & record : edits[i].getRecords())
{
blob_store.read(record.entry.file_id,
blob_store.read(buildV3Id(TEST_NAMESPACE_ID, page_id),
record.entry.file_id,
record.entry.offset,
c_buff_read + i * buff_size,
record.entry.size,
Expand Down Expand Up @@ -859,7 +861,8 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead)
ASSERT_EQ(check_field_sizes, offsets);

// Read
blob_store.read(record.entry.file_id,
blob_store.read(buildV3Id(TEST_NAMESPACE_ID, page_id),
record.entry.file_id,
record.entry.offset,
c_buff_read + index * buff_size,
record.entry.size,
Expand Down
84 changes: 84 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,89 @@ try
}
CATCH

TEST_F(PageStorageTest, TruncateBlobFile)
try
{
const size_t buf_sz = 1024;
char c_buff[buf_sz];

for (size_t i = 0; i < buf_sz; ++i)
{
c_buff[i] = i % 0xff;
}

{
WriteBatch batch;
batch.putPage(1, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, {});
page_storage->write(std::move(batch));
}

auto blob_file = Poco::File(getTemporaryPath() + "/blobfile_1");

page_storage = reopenWithConfig(config);
EXPECT_GT(blob_file.getSize(), 0);

{
WriteBatch batch;
batch.delPage(1);
page_storage->write(std::move(batch));
}
page_storage = reopenWithConfig(config);
page_storage->gc(/*not_skip*/ false, nullptr, nullptr);
EXPECT_EQ(blob_file.getSize(), 0);
}
CATCH

TEST_F(PageStorageTest, EntryTagAfterFullGC)
try
{
{
PageStorage::Config config;
config.blob_heavy_gc_valid_rate = 1.0; /// always run full gc
page_storage = reopenWithConfig(config);
}

const size_t buf_sz = 1024;
char c_buff[buf_sz];

for (size_t i = 0; i < buf_sz; ++i)
{
c_buff[i] = i % 0xff;
}

PageId page_id = 120;
UInt64 tag = 12345;
{
WriteBatch batch;
batch.putPage(page_id, tag, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, {});
page_storage->write(std::move(batch));
}

{
auto entry = page_storage->getEntry(page_id);
ASSERT_EQ(entry.tag, tag);
auto page = page_storage->read(page_id);
for (size_t i = 0; i < buf_sz; ++i)
{
EXPECT_EQ(*(page.data.begin() + i), static_cast<char>(i % 0xff));
}
}

auto done_full_gc = page_storage->gc();
EXPECT_TRUE(done_full_gc);

{
auto entry = page_storage->getEntry(page_id);
ASSERT_EQ(entry.tag, tag);
auto page = page_storage->read(page_id);
for (size_t i = 0; i < buf_sz; ++i)
{
EXPECT_EQ(*(page.data.begin() + i), static_cast<char>(i % 0xff));
}
}
}
CATCH

TEST_F(PageStorageTest, DumpPageStorageSnapshot)
try
{
Expand Down Expand Up @@ -1553,6 +1636,7 @@ try
}
CATCH


TEST_F(PageStorageTest, ReloadConfig)
try
{
Expand Down

0 comments on commit c5b66bd

Please sign in to comment.