Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageStorage: Fix the WAL snapshot dumped into file may contains invalid "being_ref_count" #4987

Merged
merged 19 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dbms/src/Encryption/MockKeyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Encryption/MockKeyManager.h>
#include <Storages/Transaction/FileEncryption.h>
#include <common/logger_useful.h>
#include <fmt/core.h>

#include <iostream>
Expand All @@ -40,12 +42,14 @@ MockKeyManager::MockKeyManager(EncryptionMethod method_, const String & key_, co
, key{key_}
, iv{iv}
, encryption_enabled{encryption_enabled_}
, logger(DB::Logger::get("MockKeyManager"))
{}

FileEncryptionInfo MockKeyManager::newFile(const String & fname)
{
if (encryption_enabled)
{
LOG_FMT_TRACE(logger, "Create mock encryption [file={}]", fname);
files.emplace_back(fname);
}
return getFile(fname);
Expand All @@ -64,6 +68,7 @@ void MockKeyManager::deleteFile(const String & fname, bool throw_on_error)
{
if (*iter == fname)
{
LOG_FMT_TRACE(logger, "Delete mock encryption [file={}]", fname);
files.erase(iter);
break;
}
Expand All @@ -80,6 +85,7 @@ void MockKeyManager::linkFile(const String & src_fname, const String & dst_fname
{
throw DB::Exception(fmt::format("Can't find file which name is {}", src_fname), DB::ErrorCodes::LOGICAL_ERROR);
}
LOG_FMT_TRACE(logger, "Link mock encryption file [src_file={}] [dst_file={}]", src_fname, dst_fname);
files.emplace_back(dst_fname);
}
}
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Encryption/MockKeyManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

namespace DB
{
class MockKeyManager : public KeyManager
class Logger;
using LoggerPtr = std::shared_ptr<Logger>;

class MockKeyManager final : public KeyManager
{
public:
~MockKeyManager() = default;
~MockKeyManager() override = default;

MockKeyManager(bool encryption_enabled_ = true);
explicit MockKeyManager(bool encryption_enabled_ = true);

MockKeyManager(EncryptionMethod method_, const String & key_, const String & iv, bool encryption_enabled_ = true);

Expand All @@ -50,5 +53,7 @@ class MockKeyManager : public KeyManager
String key;
String iv;
bool encryption_enabled;

LoggerPtr logger;
};
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class PageStorage : private boost::noncopyable
SettingUInt64 blob_block_alignment_bytes = 0;

SettingUInt64 wal_roll_size = PAGE_META_ROLL_SIZE;
SettingUInt64 wal_recover_mode = 0;
SettingUInt64 wal_recover_mode = 0x00; // WALRecoveryMode::TolerateCorruptedTailRecords
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not simply

Suggested change
SettingUInt64 wal_recover_mode = 0x00; // WALRecoveryMode::TolerateCorruptedTailRecords
SettingUInt64 wal_recover_mode = WALRecoveryMode::TolerateCorruptedTailRecords;

SettingUInt64 wal_max_persisted_log_files = MAX_PERSISTED_LOG_FILES;

void reload(const Config & rhs)
Expand Down
22 changes: 19 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V3/MapUtils.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/PageDirectoryFactory.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/PageEntry.h>
#include <Storages/Page/V3/WAL/WALReader.h>
Expand Down Expand Up @@ -1191,16 +1192,31 @@ PageDirectory::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_ids) con
return std::make_pair(std::move(blob_versioned_entries), total_page_size);
}

bool PageDirectory::tryDumpSnapshot(const WriteLimiterPtr & write_limiter)
bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter)
{
bool done_any_io = false;
// In order not to make read amplification too high, only apply compact logs when ...
auto files_snap = wal->getFilesSnapshot();
if (files_snap.needSave(max_persisted_log_files))
{
// To prevent writes from affecting dumping snapshot (and vice versa), old log files
// are read from disk and a temporary PageDirectory is generated for dumping snapshot.
// The main reason write affect dumping snapshot is that we can not get a read-only
// `being_ref_count` by the function `createSnapshot()`.
assert(!files_snap.persisted_log_files.empty()); // should not be empty when `needSave` return true
auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
auto identifier = fmt::format("{}_dump_{}", wal->name(), log_num);
auto snapshot_reader = wal->createReaderForFiles(identifier, files_snap.persisted_log_files, read_limiter);
PageDirectoryFactory factory;
// we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that
// persist new logs into disk. So we pass `nullptr` as `wal` to the factory.
PageDirectoryPtr collapsed_dir = factory.createFromReader(
identifier,
std::move(snapshot_reader),
/*wal=*/nullptr);
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
// The records persisted in `files_snap` is older than or equal to all records in `edit`
auto edit = dumpSnapshotToEdit();
done_any_io = wal->saveSnapshot(std::move(files_snap), std::move(edit), write_limiter);
auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit();
done_any_io = wal->saveSnapshot(std::move(files_snap), std::move(edit_from_disk), write_limiter);
}
return done_any_io;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ class PageDirectory

void gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter = nullptr);

bool tryDumpSnapshot(const WriteLimiterPtr & write_limiter = nullptr);
bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr);

PageEntriesV3 gcInMemEntries();

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ namespace PS::V3
PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALStore::Config config)
{
auto [wal, reader] = WALStore::create(storage_name, file_provider, delegator, config);
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(storage_name), std::move(wal), config.max_persisted_log_files);
return createFromReader(storage_name, reader, std::move(wal));
}

PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal)
{
PageDirectoryPtr dir = std::make_unique<PageDirectory>(storage_name, std::move(wal));
loadFromDisk(dir, std::move(reader));

// Reset the `sequence` to the maximum of persisted.
Expand All @@ -40,7 +45,7 @@ PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderP
// After restoring from the disk, we need cleanup all invalid entries in memory, or it will
// try to run GC again on some entries that are already marked as invalid in BlobStore.
dir->gcInMemEntries();
LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory"), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence);
LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory", storage_name), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence);

if (blob_stats)
{
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/WALStore.h>

namespace DB
{
Expand Down Expand Up @@ -47,6 +48,8 @@ class PageDirectoryFactory

PageDirectoryPtr create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALStore::Config config);

PageDirectoryPtr createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal);

// just for test
PageDirectoryPtr createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ bool PageStorageImpl::gcImpl(bool /*not_skip*/, const WriteLimiterPtr & write_li

// 1. Do the MVCC gc, clean up expired snapshot.
// And get the expired entries.
if (page_directory->tryDumpSnapshot(write_limiter))
if (page_directory->tryDumpSnapshot(read_limiter, write_limiter))
{
GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment();
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/PageStorageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PageStorageImpl : public DB::PageStorage
const Config & config_,
const FileProviderPtr & file_provider_);

~PageStorageImpl();
~PageStorageImpl() override;

static BlobStore::Config parseBlobConfig(const Config & config)
{
Expand All @@ -54,8 +54,8 @@ class PageStorageImpl : public DB::PageStorage
WALStore::Config wal_config;

wal_config.roll_size = config.wal_roll_size;
wal_config.wal_recover_mode = config.wal_recover_mode;
wal_config.max_persisted_log_files = config.wal_max_persisted_log_files;
wal_config.setRecoverMode(config.wal_recover_mode);

return wal_config;
}
Expand Down
35 changes: 14 additions & 21 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Encryption/FileProvider.h>
#include <Poco/File.h>
#include <Poco/Logger.h>
Expand Down Expand Up @@ -46,25 +47,31 @@ std::pair<WALStorePtr, WALStoreReaderPtr> WALStore::create(
auto reader = WALStoreReader::create(storage_name,
provider,
delegator,
static_cast<WALRecoveryMode>(config.wal_recover_mode.get()));
config.getRecoverMode());
// Create a new LogFile for writing new logs
auto last_log_num = reader->lastLogNum() + 1; // TODO reuse old file
return {
std::unique_ptr<WALStore>(new WALStore(std::move(storage_name), delegator, provider, last_log_num, std::move(config))),
reader};
}

WALStoreReaderPtr WALStore::createReaderForFiles(const String & identifier, const LogFilenameSet & log_filenames, const ReadLimiterPtr & read_limiter)
{
return WALStoreReader::create(identifier, provider, log_filenames, config.getRecoverMode(), read_limiter);
}

WALStore::WALStore(
String storage_name,
String storage_name_,
const PSDiskDelegatorPtr & delegator_,
const FileProviderPtr & provider_,
Format::LogNumberType last_log_num_,
WALStore::Config config_)
: delegator(delegator_)
: storage_name(std::move(storage_name_))
, delegator(delegator_)
, provider(provider_)
, last_log_num(last_log_num_)
, wal_paths_index(0)
, logger(Logger::get("WALStore", std::move(storage_name)))
, logger(Logger::get("WALStore", storage_name))
, config(config_)
{
}
Expand Down Expand Up @@ -186,7 +193,7 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire

LOG_FMT_INFO(logger, "Saving directory snapshot");

// Use {largest_log_num + 1, 1} to save the `edit`
// Use {largest_log_num, 1} to save the `edit`
const auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
// Create a temporary file for saving directory snapshot
auto [compact_log, log_filename] = createLogWriter({log_num, 1}, /*manual_flush*/ true);
Expand All @@ -212,25 +219,11 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire
true);
LOG_FMT_INFO(logger, "Rename log file to normal done [fullname={}]", normal_fullname);

// #define ARCHIVE_COMPACTED_LOGS // keep for debug

// Remove compacted log files.
for (const auto & filename : files_snap.persisted_log_files)
{
if (auto f = Poco::File(filename.fullname(LogFileStage::Normal)); f.exists())
{
#ifndef ARCHIVE_COMPACTED_LOGS
f.remove();
#else
const Poco::Path archive_path(delegator->defaultPath(), "archive");
Poco::File archive_dir(archive_path);
if (!archive_dir.exists())
archive_dir.createDirectory();
auto dest = archive_path.toString() + "/" + filename.filename(LogFileStage::Normal);
f.moveTo(dest);
LOG_FMT_INFO(logger, "archive {} to {}", filename.fullname(LogFileStage::Normal), dest);
#endif
}
const auto log_fullname = filename.fullname(LogFileStage::Normal);
provider->deleteRegularFile(log_fullname, EncryptionPath(log_fullname, ""));
Comment on lines +225 to +226
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix #4986

jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
}

FmtBuffer fmt_buf;
Expand Down
36 changes: 33 additions & 3 deletions dbms/src/Storages/Page/V3/WALStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,56 @@ class WALStore
struct Config
{
SettingUInt64 roll_size = PAGE_META_ROLL_SIZE;
SettingUInt64 wal_recover_mode = 0;
SettingUInt64 max_persisted_log_files = MAX_PERSISTED_LOG_FILES;

private:
SettingUInt64 wal_recover_mode = 0;

public:
void setRecoverMode(UInt64 recover_mode)
{
if (unlikely(recover_mode != static_cast<UInt64>(WALRecoveryMode::TolerateCorruptedTailRecords)
&& recover_mode != static_cast<UInt64>(WALRecoveryMode::AbsoluteConsistency)
&& recover_mode != static_cast<UInt64>(WALRecoveryMode::PointInTimeRecovery)
&& recover_mode != static_cast<UInt64>(WALRecoveryMode::SkipAnyCorruptedRecords)))
{
throw Exception("Unknow recover mode [num={}]", recover_mode);
}
wal_recover_mode = recover_mode;
}

WALRecoveryMode getRecoverMode()
{
return static_cast<WALRecoveryMode>(wal_recover_mode.get());
}
};

constexpr static const char * wal_folder_prefix = "/wal";

static std::pair<WALStorePtr, WALStoreReaderPtr>
create(
String storage_name,
String storage_name_,
FileProviderPtr & provider,
PSDiskDelegatorPtr & delegator,
WALStore::Config config);

WALStoreReaderPtr createReaderForFiles(const String & identifier, const LogFilenameSet & log_filenames, const ReadLimiterPtr & read_limiter);

void apply(PageEntriesEdit & edit, const PageVersion & version, const WriteLimiterPtr & write_limiter = nullptr);
void apply(const PageEntriesEdit & edit, const WriteLimiterPtr & write_limiter = nullptr);

struct FilesSnapshot
{
Format::LogNumberType current_writting_log_num;
// The log files to generate snapshot from. Sorted by <log number, log level>.
// If the WAL log file is not inited, it is an empty set.
LogFilenameSet persisted_log_files;

// Note that persisted_log_files should not be empty for needSave() == true,
// cause we get the largest log num from persisted_log_files as the new
// file name.
bool needSave(const size_t & max_size) const
{
// TODO: Make it configurable and check the reasonable of this number
return persisted_log_files.size() > max_size;
}
};
Expand All @@ -121,6 +147,8 @@ class WALStore
PageEntriesEdit && directory_snap,
const WriteLimiterPtr & write_limiter = nullptr);

const String & name() { return storage_name; }

private:
WALStore(
String storage_name,
Expand All @@ -134,6 +162,8 @@ class WALStore
const std::pair<Format::LogNumberType, Format::LogNumberType> & new_log_lvl,
bool manual_flush);

private:
const String storage_name;
PSDiskDelegatorPtr delegator;
FileProviderPtr provider;
mutable std::mutex log_file_mutex;
Expand Down
13 changes: 4 additions & 9 deletions dbms/src/Storages/Page/V3/tests/entries_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Storages/Page/V3/PageEntry.h>
#include <Storages/tests/TiFlashStorageTestBasic.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <gtest/gtest.h>

namespace DB
{
Expand Down Expand Up @@ -221,7 +222,9 @@ inline ::testing::AssertionResult getEntryNotExist(
String error;
try
{
auto id_entry = dir->get(page_id, snap);
auto id_entry = dir->getOrNull(page_id, snap);
if (!id_entry.second.isValid())
return ::testing::AssertionSuccess();
error = fmt::format(
"Expect entry [id={}] from {} with snap{} not exist, but got <{}.{}, {}>",
page_id_expr,
Expand All @@ -231,14 +234,6 @@ inline ::testing::AssertionResult getEntryNotExist(
id_entry.first.low,
toDebugString(id_entry.second));
}
catch (DB::Exception & ex)
{
if (ex.code() == ErrorCodes::PS_ENTRY_NOT_EXISTS || ex.code() == ErrorCodes::PS_ENTRY_NO_VALID_VERSION)
return ::testing::AssertionSuccess();
else
error = ex.displayText();
return ::testing::AssertionFailure(::testing::Message(error.c_str()));
}
catch (...)
{
error = getCurrentExceptionMessage(true);
Expand Down
Loading