Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageStorage: Fix the WAL snapshot dumped into file may contains invalid "being_ref_count" (#4987) #4992

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dbms/src/Encryption/FileProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,12 @@ void FileProvider::deleteRegularFile(const String & file_path_, const Encryption
{
throw DB::TiFlashException("File: " + data_file.path() + " is not a regular file", Errors::Encryption::Internal);
}
key_manager->deleteFile(encryption_path_.full_path, true);
// Remove the file on disk before removing the encryption key. Or we may leave an encrypted file without the encryption key
// and the encrypted file can not be read.
// In the worst case that TiFlash crash between removing the file on disk and removing the encryption key, we may leave
// the encryption key not deleted. However, this is a rare case and won't cause serious problem.
data_file.remove(false);
key_manager->deleteFile(encryption_path_.full_path, true);
}
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Encryption/FileProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class FileProvider

// If dir_path_as_encryption_path is true, use dir_path_ as EncryptionPath
// If false, use every file's path inside dir_path_ as EncryptionPath
// Note this method is not atomic, and after calling it, the files in dir_path_ cannot be read again.
void deleteDirectory(
const String & dir_path_,
bool dir_path_as_encryption_path = false,
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Encryption/MockKeyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Encryption/MockKeyManager.h>
#include <Storages/Transaction/FileEncryption.h>
#include <common/logger_useful.h>
#include <fmt/core.h>

#include <iostream>
Expand All @@ -40,12 +42,14 @@ MockKeyManager::MockKeyManager(EncryptionMethod method_, const String & key_, co
, key{key_}
, iv{iv}
, encryption_enabled{encryption_enabled_}
, logger(DB::Logger::get("MockKeyManager"))
{}

FileEncryptionInfo MockKeyManager::newFile(const String & fname)
{
if (encryption_enabled)
{
LOG_FMT_TRACE(logger, "Create mock encryption [file={}]", fname);
files.emplace_back(fname);
}
return getFile(fname);
Expand All @@ -64,6 +68,7 @@ void MockKeyManager::deleteFile(const String & fname, bool throw_on_error)
{
if (*iter == fname)
{
LOG_FMT_TRACE(logger, "Delete mock encryption [file={}]", fname);
files.erase(iter);
break;
}
Expand All @@ -80,6 +85,7 @@ void MockKeyManager::linkFile(const String & src_fname, const String & dst_fname
{
throw DB::Exception(fmt::format("Can't find file which name is {}", src_fname), DB::ErrorCodes::LOGICAL_ERROR);
}
LOG_FMT_TRACE(logger, "Link mock encryption file [src_file={}] [dst_file={}]", src_fname, dst_fname);
files.emplace_back(dst_fname);
}
}
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Encryption/MockKeyManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

namespace DB
{
class MockKeyManager : public KeyManager
class Logger;
using LoggerPtr = std::shared_ptr<Logger>;

class MockKeyManager final : public KeyManager
{
public:
~MockKeyManager() = default;
~MockKeyManager() override = default;

MockKeyManager(bool encryption_enabled_ = true);
explicit MockKeyManager(bool encryption_enabled_ = true);

MockKeyManager(EncryptionMethod method_, const String & key_, const String & iv, bool encryption_enabled_ = true);

Expand All @@ -50,5 +53,7 @@ class MockKeyManager : public KeyManager
String key;
String iv;
bool encryption_enabled;

LoggerPtr logger;
};
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void GlobalStoragePool::restore()

bool GlobalStoragePool::gc()
{
return gc(Settings(), true, DELTA_MERGE_GC_PERIOD);
return gc(global_context.getSettingsRef(), true, DELTA_MERGE_GC_PERIOD);
}

bool GlobalStoragePool::gc(const Settings & settings, bool immediately, const Seconds & try_gc_period)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageUtil.h>
#include <Storages/Page/Snapshot.h>
#include <Storages/Page/WALRecoveryMode.h>
#include <Storages/Page/WriteBatch.h>
#include <common/logger_useful.h>
#include <fmt/format.h>
Expand Down Expand Up @@ -140,7 +141,7 @@ class PageStorage : private boost::noncopyable
SettingUInt64 blob_block_alignment_bytes = 0;

SettingUInt64 wal_roll_size = PAGE_META_ROLL_SIZE;
SettingUInt64 wal_recover_mode = 0;
SettingUInt64 wal_recover_mode = static_cast<UInt64>(WALRecoveryMode::TolerateCorruptedTailRecords);
SettingUInt64 wal_max_persisted_log_files = MAX_PERSISTED_LOG_FILES;

void reload(const Config & rhs)
Expand Down
22 changes: 19 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V3/MapUtils.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/PageDirectoryFactory.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/PageEntry.h>
#include <Storages/Page/V3/WAL/WALReader.h>
Expand Down Expand Up @@ -1191,16 +1192,31 @@ PageDirectory::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_ids) con
return std::make_pair(std::move(blob_versioned_entries), total_page_size);
}

bool PageDirectory::tryDumpSnapshot(const WriteLimiterPtr & write_limiter)
bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter)
{
bool done_any_io = false;
// In order not to make read amplification too high, only apply compact logs when ...
auto files_snap = wal->getFilesSnapshot();
if (files_snap.needSave(max_persisted_log_files))
{
// To prevent writes from affecting dumping snapshot (and vice versa), old log files
// are read from disk and a temporary PageDirectory is generated for dumping snapshot.
// The main reason write affect dumping snapshot is that we can not get a read-only
// `being_ref_count` by the function `createSnapshot()`.
assert(!files_snap.persisted_log_files.empty()); // should not be empty when `needSave` return true
auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
auto identifier = fmt::format("{}_dump_{}", wal->name(), log_num);
auto snapshot_reader = wal->createReaderForFiles(identifier, files_snap.persisted_log_files, read_limiter);
PageDirectoryFactory factory;
// we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that
// persist new logs into disk. So we pass `nullptr` as `wal` to the factory.
PageDirectoryPtr collapsed_dir = factory.createFromReader(
identifier,
std::move(snapshot_reader),
/*wal=*/nullptr);
// The records persisted in `files_snap` is older than or equal to all records in `edit`
auto edit = dumpSnapshotToEdit();
done_any_io = wal->saveSnapshot(std::move(files_snap), std::move(edit), write_limiter);
auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit();
done_any_io = wal->saveSnapshot(std::move(files_snap), std::move(edit_from_disk), write_limiter);
}
return done_any_io;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ class PageDirectory

void gcApply(PageEntriesEdit && migrated_edit, const WriteLimiterPtr & write_limiter = nullptr);

bool tryDumpSnapshot(const WriteLimiterPtr & write_limiter = nullptr);
bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr);

PageEntriesV3 gcInMemEntries();

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ namespace PS::V3
PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALStore::Config config)
{
auto [wal, reader] = WALStore::create(storage_name, file_provider, delegator, config);
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(storage_name), std::move(wal), config.max_persisted_log_files);
return createFromReader(storage_name, reader, std::move(wal));
}

PageDirectoryPtr PageDirectoryFactory::createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal)
{
PageDirectoryPtr dir = std::make_unique<PageDirectory>(storage_name, std::move(wal));
loadFromDisk(dir, std::move(reader));

// Reset the `sequence` to the maximum of persisted.
Expand All @@ -40,7 +45,7 @@ PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderP
// After restoring from the disk, we need cleanup all invalid entries in memory, or it will
// try to run GC again on some entries that are already marked as invalid in BlobStore.
dir->gcInMemEntries();
LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory"), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence);
LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory", storage_name), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence);

if (blob_stats)
{
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/WALStore.h>

namespace DB
{
Expand Down Expand Up @@ -47,6 +48,8 @@ class PageDirectoryFactory

PageDirectoryPtr create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALStore::Config config);

PageDirectoryPtr createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal);

// just for test
PageDirectoryPtr createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ bool PageStorageImpl::gcImpl(bool /*not_skip*/, const WriteLimiterPtr & write_li

// 1. Do the MVCC gc, clean up expired snapshot.
// And get the expired entries.
if (page_directory->tryDumpSnapshot(write_limiter))
if (page_directory->tryDumpSnapshot(read_limiter, write_limiter))
{
GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment();
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/PageStorageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PageStorageImpl : public DB::PageStorage
const Config & config_,
const FileProviderPtr & file_provider_);

~PageStorageImpl();
~PageStorageImpl() override;

static BlobStore::Config parseBlobConfig(const Config & config)
{
Expand All @@ -54,8 +54,8 @@ class PageStorageImpl : public DB::PageStorage
WALStore::Config wal_config;

wal_config.roll_size = config.wal_roll_size;
wal_config.wal_recover_mode = config.wal_recover_mode;
wal_config.max_persisted_log_files = config.wal_max_persisted_log_files;
wal_config.setRecoverMode(config.wal_recover_mode);

return wal_config;
}
Expand Down
35 changes: 14 additions & 21 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Encryption/FileProvider.h>
#include <Poco/File.h>
#include <Poco/Logger.h>
Expand Down Expand Up @@ -46,25 +47,31 @@ std::pair<WALStorePtr, WALStoreReaderPtr> WALStore::create(
auto reader = WALStoreReader::create(storage_name,
provider,
delegator,
static_cast<WALRecoveryMode>(config.wal_recover_mode.get()));
config.getRecoverMode());
// Create a new LogFile for writing new logs
auto last_log_num = reader->lastLogNum() + 1; // TODO reuse old file
return {
std::unique_ptr<WALStore>(new WALStore(std::move(storage_name), delegator, provider, last_log_num, std::move(config))),
reader};
}

WALStoreReaderPtr WALStore::createReaderForFiles(const String & identifier, const LogFilenameSet & log_filenames, const ReadLimiterPtr & read_limiter)
{
return WALStoreReader::create(identifier, provider, log_filenames, config.getRecoverMode(), read_limiter);
}

WALStore::WALStore(
String storage_name,
String storage_name_,
const PSDiskDelegatorPtr & delegator_,
const FileProviderPtr & provider_,
Format::LogNumberType last_log_num_,
WALStore::Config config_)
: delegator(delegator_)
: storage_name(std::move(storage_name_))
, delegator(delegator_)
, provider(provider_)
, last_log_num(last_log_num_)
, wal_paths_index(0)
, logger(Logger::get("WALStore", std::move(storage_name)))
, logger(Logger::get("WALStore", storage_name))
, config(config_)
{
}
Expand Down Expand Up @@ -186,7 +193,7 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire

LOG_FMT_INFO(logger, "Saving directory snapshot");

// Use {largest_log_num + 1, 1} to save the `edit`
// Use {largest_log_num, 1} to save the `edit`
const auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
// Create a temporary file for saving directory snapshot
auto [compact_log, log_filename] = createLogWriter({log_num, 1}, /*manual_flush*/ true);
Expand All @@ -212,25 +219,11 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire
true);
LOG_FMT_INFO(logger, "Rename log file to normal done [fullname={}]", normal_fullname);

// #define ARCHIVE_COMPACTED_LOGS // keep for debug

// Remove compacted log files.
for (const auto & filename : files_snap.persisted_log_files)
{
if (auto f = Poco::File(filename.fullname(LogFileStage::Normal)); f.exists())
{
#ifndef ARCHIVE_COMPACTED_LOGS
f.remove();
#else
const Poco::Path archive_path(delegator->defaultPath(), "archive");
Poco::File archive_dir(archive_path);
if (!archive_dir.exists())
archive_dir.createDirectory();
auto dest = archive_path.toString() + "/" + filename.filename(LogFileStage::Normal);
f.moveTo(dest);
LOG_FMT_INFO(logger, "archive {} to {}", filename.fullname(LogFileStage::Normal), dest);
#endif
}
const auto log_fullname = filename.fullname(LogFileStage::Normal);
provider->deleteRegularFile(log_fullname, EncryptionPath(log_fullname, ""));
}

FmtBuffer fmt_buf;
Expand Down
Loading