Skip to content

Commit

Permalink
PageStorage: Fix the WAL snapshot dumped into file may contains inval…
Browse files Browse the repository at this point in the history
…id "being_ref_count" (#4987)

close #4957, close #4986
  • Loading branch information
JaySon-Huang authored May 24, 2022
1 parent 3674439 commit 1412c25
Show file tree
Hide file tree
Showing 19 changed files with 295 additions and 87 deletions.
6 changes: 5 additions & 1 deletion dbms/src/Encryption/FileProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,12 @@ void FileProvider::deleteRegularFile(const String & file_path_, const Encryption
{
throw DB::TiFlashException("File: " + data_file.path() + " is not a regular file", Errors::Encryption::Internal);
}
key_manager->deleteFile(encryption_path_.full_path, true);
// Remove the file on disk before removing the encryption key. Or we may leave an encrypted file without the encryption key
// and the encrypted file can not be read.
// In the worst case that TiFlash crash between removing the file on disk and removing the encryption key, we may leave
// the encryption key not deleted. However, this is a rare case and won't cause serious problem.
data_file.remove(false);
key_manager->deleteFile(encryption_path_.full_path, true);
}
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Encryption/FileProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class FileProvider

// If dir_path_as_encryption_path is true, use dir_path_ as EncryptionPath
// If false, use every file's path inside dir_path_ as EncryptionPath
// Note this method is not atomic, and after calling it, the files in dir_path_ cannot be read again.
void deleteDirectory(
const String & dir_path_,
bool dir_path_as_encryption_path = false,
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Encryption/MockKeyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Encryption/MockKeyManager.h>
#include <Storages/Transaction/FileEncryption.h>
#include <common/logger_useful.h>
#include <fmt/core.h>

#include <iostream>
Expand All @@ -40,12 +42,14 @@ MockKeyManager::MockKeyManager(EncryptionMethod method_, const String & key_, co
, key{key_}
, iv{iv}
, encryption_enabled{encryption_enabled_}
, logger(DB::Logger::get("MockKeyManager"))
{}

FileEncryptionInfo MockKeyManager::newFile(const String & fname)
{
if (encryption_enabled)
{
LOG_FMT_TRACE(logger, "Create mock encryption [file={}]", fname);
files.emplace_back(fname);
}
return getFile(fname);
Expand All @@ -64,6 +68,7 @@ void MockKeyManager::deleteFile(const String & fname, bool throw_on_error)
{
if (*iter == fname)
{
LOG_FMT_TRACE(logger, "Delete mock encryption [file={}]", fname);
files.erase(iter);
break;
}
Expand All @@ -80,6 +85,7 @@ void MockKeyManager::linkFile(const String & src_fname, const String & dst_fname
{
throw DB::Exception(fmt::format("Can't find file which name is {}", src_fname), DB::ErrorCodes::LOGICAL_ERROR);
}
LOG_FMT_TRACE(logger, "Link mock encryption file [src_file={}] [dst_file={}]", src_fname, dst_fname);
files.emplace_back(dst_fname);
}
}
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Encryption/MockKeyManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

namespace DB
{
class MockKeyManager : public KeyManager
class Logger;
using LoggerPtr = std::shared_ptr<Logger>;

class MockKeyManager final : public KeyManager
{
public:
~MockKeyManager() = default;
~MockKeyManager() override = default;

MockKeyManager(bool encryption_enabled_ = true);
explicit MockKeyManager(bool encryption_enabled_ = true);

MockKeyManager(EncryptionMethod method_, const String & key_, const String & iv, bool encryption_enabled_ = true);

Expand All @@ -50,5 +53,7 @@ class MockKeyManager : public KeyManager
String key;
String iv;
bool encryption_enabled;

LoggerPtr logger;
};
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void GlobalStoragePool::restore()

bool GlobalStoragePool::gc()
{
return gc(Settings(), true, DELTA_MERGE_GC_PERIOD);
return gc(global_context.getSettingsRef(), true, DELTA_MERGE_GC_PERIOD);
}

bool GlobalStoragePool::gc(const Settings & settings, bool immediately, const Seconds & try_gc_period)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageUtil.h>
#include <Storages/Page/Snapshot.h>
#include <Storages/Page/WALRecoveryMode.h>
#include <Storages/Page/WriteBatch.h>
#include <common/logger_useful.h>
#include <fmt/format.h>
Expand Down Expand Up @@ -140,7 +141,7 @@ class PageStorage : private boost::noncopyable
SettingUInt64 blob_block_alignment_bytes = 0;

SettingUInt64 wal_roll_size = PAGE_META_ROLL_SIZE;
SettingUInt64 wal_recover_mode = 0;
SettingUInt64 wal_recover_mode = static_cast<UInt64>(WALRecoveryMode::TolerateCorruptedTailRecords);
SettingUInt64 wal_max_persisted_log_files = MAX_PERSISTED_LOG_FILES;

void reload(const Config & rhs)
Expand Down
22 changes: 19 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V3/MapUtils.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/PageDirectoryFactory.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/PageEntry.h>
#include <Storages/Page/V3/WAL/WALReader.h>
Expand Down Expand Up @@ -1191,16 +1192,31 @@ PageDirectory::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_ids) con
return std::make_pair(std::move(blob_versioned_entries), total_page_size);
}

bool PageDirectory::tryDumpSnapshot(const WriteLimiterPtr & write_limiter)
bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter)
{
bool done_any_io = false;
// In order not to make read amplification too high, only apply compact logs when ...
auto files_snap = wal->getFilesSnapshot();
if (files_snap.needSave(max_persisted_log_files))
{
// To prevent writes from affecting dumping snapshot (and vice versa), old log files
// are read from disk and a temporary PageDirectory is generated for dumping snapshot.
// The main reason write affect dumping snapshot is that we can not get a read-only
// `being_ref_count` by the function `createSnapshot()`.
assert(!files_snap.persisted_log_files.empty()); // should not be empty when `needSave` return true
auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
auto identifier = fmt::format("{}_dump_{}", wal->name(), log_num);
auto snapshot_reader = wal->createReaderForFiles(identifier, files_snap.persisted_log_files, read_limiter);
PageDirectoryFactory factory;
// we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that
// persist new logs into disk. So we pass `nullptr` as `wal` to the factory.
PageDirectoryPtr collapsed_dir = factory.createFromReader(
identifier,
std::move(snapshot_reader),
/*wal=*/nullptr);
// The records persisted in `files_snap` is older than or equal to all records in `edit`
auto edit = dumpSnapshotToEdit();
done_any_io = wal->saveSnapshot(std::move(files_snap), std::move(edit), write_limiter);
auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit();
done_any_io = wal->saveSnapshot(std::move(files_snap), std::move(edit_from_disk), write_limiter);
}
return done_any_io;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ class PageDirectory

void gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter = nullptr);

bool tryDumpSnapshot(const WriteLimiterPtr & write_limiter = nullptr);
bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr);

PageEntriesV3 gcInMemEntries();

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ namespace PS::V3
PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALStore::Config config)
{
auto [wal, reader] = WALStore::create(storage_name, file_provider, delegator, config);
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(storage_name), std::move(wal), config.max_persisted_log_files);
return createFromReader(storage_name, reader, std::move(wal));
}

PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal)
{
PageDirectoryPtr dir = std::make_unique<PageDirectory>(storage_name, std::move(wal));
loadFromDisk(dir, std::move(reader));

// Reset the `sequence` to the maximum of persisted.
Expand All @@ -40,7 +45,7 @@ PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderP
// After restoring from the disk, we need cleanup all invalid entries in memory, or it will
// try to run GC again on some entries that are already marked as invalid in BlobStore.
dir->gcInMemEntries();
LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory"), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence);
LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory", storage_name), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence);

if (blob_stats)
{
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/WALStore.h>

namespace DB
{
Expand Down Expand Up @@ -47,6 +48,8 @@ class PageDirectoryFactory

PageDirectoryPtr create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALStore::Config config);

PageDirectoryPtr createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal);

// just for test
PageDirectoryPtr createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ bool PageStorageImpl::gcImpl(bool /*not_skip*/, const WriteLimiterPtr & write_li

// 1. Do the MVCC gc, clean up expired snapshot.
// And get the expired entries.
if (page_directory->tryDumpSnapshot(write_limiter))
if (page_directory->tryDumpSnapshot(read_limiter, write_limiter))
{
GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment();
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/PageStorageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PageStorageImpl : public DB::PageStorage
const Config & config_,
const FileProviderPtr & file_provider_);

~PageStorageImpl();
~PageStorageImpl() override;

static BlobStore::Config parseBlobConfig(const Config & config)
{
Expand All @@ -54,8 +54,8 @@ class PageStorageImpl : public DB::PageStorage
WALStore::Config wal_config;

wal_config.roll_size = config.wal_roll_size;
wal_config.wal_recover_mode = config.wal_recover_mode;
wal_config.max_persisted_log_files = config.wal_max_persisted_log_files;
wal_config.setRecoverMode(config.wal_recover_mode);

return wal_config;
}
Expand Down
35 changes: 14 additions & 21 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Encryption/FileProvider.h>
#include <Poco/File.h>
#include <Poco/Logger.h>
Expand Down Expand Up @@ -46,25 +47,31 @@ std::pair<WALStorePtr, WALStoreReaderPtr> WALStore::create(
auto reader = WALStoreReader::create(storage_name,
provider,
delegator,
static_cast<WALRecoveryMode>(config.wal_recover_mode.get()));
config.getRecoverMode());
// Create a new LogFile for writing new logs
auto last_log_num = reader->lastLogNum() + 1; // TODO reuse old file
return {
std::unique_ptr<WALStore>(new WALStore(std::move(storage_name), delegator, provider, last_log_num, std::move(config))),
reader};
}

WALStoreReaderPtr WALStore::createReaderForFiles(const String & identifier, const LogFilenameSet & log_filenames, const ReadLimiterPtr & read_limiter)
{
return WALStoreReader::create(identifier, provider, log_filenames, config.getRecoverMode(), read_limiter);
}

WALStore::WALStore(
String storage_name,
String storage_name_,
const PSDiskDelegatorPtr & delegator_,
const FileProviderPtr & provider_,
Format::LogNumberType last_log_num_,
WALStore::Config config_)
: delegator(delegator_)
: storage_name(std::move(storage_name_))
, delegator(delegator_)
, provider(provider_)
, last_log_num(last_log_num_)
, wal_paths_index(0)
, logger(Logger::get("WALStore", std::move(storage_name)))
, logger(Logger::get("WALStore", storage_name))
, config(config_)
{
}
Expand Down Expand Up @@ -186,7 +193,7 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire

LOG_FMT_INFO(logger, "Saving directory snapshot");

// Use {largest_log_num + 1, 1} to save the `edit`
// Use {largest_log_num, 1} to save the `edit`
const auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
// Create a temporary file for saving directory snapshot
auto [compact_log, log_filename] = createLogWriter({log_num, 1}, /*manual_flush*/ true);
Expand All @@ -212,25 +219,11 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire
true);
LOG_FMT_INFO(logger, "Rename log file to normal done [fullname={}]", normal_fullname);

// #define ARCHIVE_COMPACTED_LOGS // keep for debug

// Remove compacted log files.
for (const auto & filename : files_snap.persisted_log_files)
{
if (auto f = Poco::File(filename.fullname(LogFileStage::Normal)); f.exists())
{
#ifndef ARCHIVE_COMPACTED_LOGS
f.remove();
#else
const Poco::Path archive_path(delegator->defaultPath(), "archive");
Poco::File archive_dir(archive_path);
if (!archive_dir.exists())
archive_dir.createDirectory();
auto dest = archive_path.toString() + "/" + filename.filename(LogFileStage::Normal);
f.moveTo(dest);
LOG_FMT_INFO(logger, "archive {} to {}", filename.fullname(LogFileStage::Normal), dest);
#endif
}
const auto log_fullname = filename.fullname(LogFileStage::Normal);
provider->deleteRegularFile(log_fullname, EncryptionPath(log_fullname, ""));
}

FmtBuffer fmt_buf;
Expand Down
Loading

0 comments on commit 1412c25

Please sign in to comment.