Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageStorage: Fix entry.tag after full gc && add more debug message (#5094) #5096

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <Storages/Page/V2/VersionSet/PageEntriesVersionSetWithDelta.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/TMTContext.h>
#include <common/logger_useful.h>

#include <atomic>
#include <ext/scope_guard.h>
Expand Down Expand Up @@ -1162,9 +1163,11 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
}

fiu_do_on(FailPoints::force_slow_page_storage_snapshot_release, {
std::thread thread_hold_snapshots([tasks]() {
std::thread thread_hold_snapshots([this, tasks]() {
LOG_FMT_WARNING(log, "failpoint force_slow_page_storage_snapshot_release begin");
std::this_thread::sleep_for(std::chrono::seconds(5 * 60));
(void)tasks;
LOG_FMT_WARNING(log, "failpoint force_slow_page_storage_snapshot_release end");
});
thread_hold_snapshots.detach();
});
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/PageUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ void readFile(T & file,
}

if (unlikely(bytes_read != expected_bytes))
throw DB::TiFlashException(fmt::format("No enough data in file {}, read bytes: {} , expected bytes: {}", file->getFileName(), bytes_read, expected_bytes),
throw DB::TiFlashException(fmt::format("No enough data in file {}, read bytes: {}, expected bytes: {}, offset: {}", file->getFileName(), bytes_read, expected_bytes, offset),
Errors::PageStorage::FileSizeNotMatch);
}

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V2/tests/gtest_page_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Poco/Logger.h>
#include <Storages/Page/PageUtil.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <common/logger_useful.h>

namespace DB
{
Expand All @@ -30,6 +31,7 @@ namespace tests
static const std::string FileName = "page_util_test";

TEST(PageUtilsTest, ReadWriteFile)
try
{
::remove(FileName.c_str());

Expand All @@ -52,6 +54,7 @@ TEST(PageUtilsTest, ReadWriteFile)

::remove(FileName.c_str());
}
CATCH

TEST(PageUtilsTest, FileNotExists)
{
Expand Down
43 changes: 24 additions & 19 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <Common/Checksum.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/Logger.h>
#include <Common/ProfileEvents.h>
#include <Common/StringUtils/StringUtils.h>
Expand Down Expand Up @@ -560,7 +562,7 @@ void BlobStore::read(PageIDAndEntriesV3 & entries, const PageHandler & handler,

for (const auto & [page_id_v3, entry] : entries)
{
auto blob_file = read(entry.file_id, entry.offset, data_buf, entry.size, read_limiter);
auto blob_file = read(page_id_v3, entry.file_id, entry.offset, data_buf, entry.size, read_limiter);

if constexpr (BLOBSTORE_CHECKSUM_ON_READ)
{
Expand Down Expand Up @@ -640,7 +642,7 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li
// TODO: Continuously fields can read by one system call.
const auto [beg_offset, end_offset] = entry.getFieldOffsets(field_index);
const auto size_to_read = end_offset - beg_offset;
auto blob_file = read(entry.file_id, entry.offset + beg_offset, write_offset, size_to_read, read_limiter);
auto blob_file = read(page_id_v3, entry.file_id, entry.offset + beg_offset, write_offset, size_to_read, read_limiter);
fields_offset_in_page.emplace(field_index, read_size_this_entry);

if constexpr (BLOBSTORE_CHECKSUM_ON_READ)
Expand Down Expand Up @@ -737,7 +739,7 @@ PageMap BlobStore::read(PageIDAndEntriesV3 & entries, const ReadLimiterPtr & rea
PageMap page_map;
for (const auto & [page_id_v3, entry] : entries)
{
auto blob_file = read(entry.file_id, entry.offset, pos, entry.size, read_limiter);
auto blob_file = read(page_id_v3, entry.file_id, entry.offset, pos, entry.size, read_limiter);

if constexpr (BLOBSTORE_CHECKSUM_ON_READ)
{
Expand Down Expand Up @@ -802,7 +804,7 @@ Page BlobStore::read(const PageIDAndEntryV3 & id_entry, const ReadLimiterPtr & r
free(p, buf_size);
});

auto blob_file = read(entry.file_id, entry.offset, data_buf, buf_size, read_limiter);
auto blob_file = read(page_id_v3, entry.file_id, entry.offset, data_buf, buf_size, read_limiter);
if constexpr (BLOBSTORE_CHECKSUM_ON_READ)
{
ChecksumClass digest;
Expand All @@ -829,11 +831,20 @@ Page BlobStore::read(const PageIDAndEntryV3 & id_entry, const ReadLimiterPtr & r
return page;
}

BlobFilePtr BlobStore::read(BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background)
BlobFilePtr BlobStore::read(const PageIdV3Internal & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background)
{
assert(buffers != nullptr);
auto blob_file = getBlobFile(blob_id);
blob_file->read(buffers, offset, size, read_limiter, background);
BlobFilePtr blob_file = getBlobFile(blob_id);
try
{
blob_file->read(buffers, offset, size, read_limiter, background);
}
catch (DB::Exception & e)
{
// add debug message
e.addMessage(fmt::format("(error while reading page data [page_id={}] [blob_id={}] [offset={}] [size={}] [background={}])", page_id_v3, blob_id, offset, size, background));
e.rethrow();
}
return blob_file;
}

Expand Down Expand Up @@ -1142,21 +1153,15 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
std::tie(blobfile_id, file_offset_begin) = getPosFromStats(next_alloc_size);
}

PageEntryV3 new_entry;

read(file_id, entry.offset, data_pos, entry.size, read_limiter, /*background*/ true);

// No need do crc again, crc won't be changed.
new_entry.checksum = entry.checksum;

// Need copy the field_offsets
new_entry.field_offsets = entry.field_offsets;

// Entry size won't be changed.
new_entry.size = entry.size;
// Read the data into buffer by old entry
read(page_id, file_id, entry.offset, data_pos, entry.size, read_limiter, /*background*/ true);

// Most vars of the entry is not changed, but the file id and offset
// need to be updated.
PageEntryV3 new_entry = entry;
new_entry.file_id = blobfile_id;
new_entry.offset = file_offset_begin + offset_in_data;
new_entry.padded_size = 0; // reset padded size to be zero

offset_in_data += new_entry.size;
data_pos += new_entry.size;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class BlobStore : private Allocator<false>

PageEntriesEdit handleLargeWrite(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr);

BlobFilePtr read(BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter = nullptr, bool background = false);
BlobFilePtr read(const PageIdV3Internal & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter = nullptr, bool background = false);

/**
* Ask BlobStats to get a span from BlobStat.
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ TEST_F(BlobStoreTest, testWriteRead)
ASSERT_EQ(record.entry.file_id, 1);

// Read directly from the file
blob_store.read(record.entry.file_id,
blob_store.read(buildV3Id(TEST_NAMESPACE_ID, page_id),
record.entry.file_id,
record.entry.offset,
c_buff_read + index * buff_size,
record.entry.size,
Expand Down Expand Up @@ -681,7 +682,8 @@ TEST_F(BlobStoreTest, testWriteReadWithIOLimiter)
{
for (const auto & record : edits[i].getRecords())
{
blob_store.read(record.entry.file_id,
blob_store.read(buildV3Id(TEST_NAMESPACE_ID, page_id),
record.entry.file_id,
record.entry.offset,
c_buff_read + i * buff_size,
record.entry.size,
Expand Down Expand Up @@ -859,7 +861,8 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead)
ASSERT_EQ(check_field_sizes, offsets);

// Read
blob_store.read(record.entry.file_id,
blob_store.read(buildV3Id(TEST_NAMESPACE_ID, page_id),
record.entry.file_id,
record.entry.offset,
c_buff_read + index * buff_size,
record.entry.size,
Expand Down
84 changes: 84 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,89 @@ try
}
CATCH

TEST_F(PageStorageTest, TruncateBlobFile)
try
{
const size_t buf_sz = 1024;
char c_buff[buf_sz];

for (size_t i = 0; i < buf_sz; ++i)
{
c_buff[i] = i % 0xff;
}

{
WriteBatch batch;
batch.putPage(1, 0, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, {});
page_storage->write(std::move(batch));
}

auto blob_file = Poco::File(getTemporaryPath() + "/blobfile_1");

page_storage = reopenWithConfig(config);
EXPECT_GT(blob_file.getSize(), 0);

{
WriteBatch batch;
batch.delPage(1);
page_storage->write(std::move(batch));
}
page_storage = reopenWithConfig(config);
page_storage->gc(/*not_skip*/ false, nullptr, nullptr);
EXPECT_EQ(blob_file.getSize(), 0);
}
CATCH

TEST_F(PageStorageTest, EntryTagAfterFullGC)
try
{
{
PageStorage::Config config;
config.blob_heavy_gc_valid_rate = 1.0; /// always run full gc
page_storage = reopenWithConfig(config);
}

const size_t buf_sz = 1024;
char c_buff[buf_sz];

for (size_t i = 0; i < buf_sz; ++i)
{
c_buff[i] = i % 0xff;
}

PageId page_id = 120;
UInt64 tag = 12345;
{
WriteBatch batch;
batch.putPage(page_id, tag, std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz), buf_sz, {});
page_storage->write(std::move(batch));
}

{
auto entry = page_storage->getEntry(page_id);
ASSERT_EQ(entry.tag, tag);
auto page = page_storage->read(page_id);
for (size_t i = 0; i < buf_sz; ++i)
{
EXPECT_EQ(*(page.data.begin() + i), static_cast<char>(i % 0xff));
}
}

auto done_full_gc = page_storage->gc();
EXPECT_TRUE(done_full_gc);

{
auto entry = page_storage->getEntry(page_id);
ASSERT_EQ(entry.tag, tag);
auto page = page_storage->read(page_id);
for (size_t i = 0; i < buf_sz; ++i)
{
EXPECT_EQ(*(page.data.begin() + i), static_cast<char>(i % 0xff));
}
}
}
CATCH

TEST_F(PageStorageTest, DumpPageStorageSnapshot)
try
{
Expand Down Expand Up @@ -1553,6 +1636,7 @@ try
}
CATCH


TEST_F(PageStorageTest, ReloadConfig)
try
{
Expand Down