Skip to content

Commit

Permalink
PageStorage: remove wal files according to its max sequence when dump…
Browse files Browse the repository at this point in the history
… snapshot (#7779)

close #7780
  • Loading branch information
lidezhu authored Jul 13, 2023
1 parent a4f999a commit 892c2ca
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 54 deletions.
21 changes: 21 additions & 0 deletions dbms/src/Storages/Page/V3/LogFile/LogFilename.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Storages/Page/V3/LogFile/LogFormat.h>

#include <boost/container_hash/hash_fwd.hpp>
#include <set>

namespace DB::PS::V3
Expand Down Expand Up @@ -56,6 +57,11 @@ struct LogFilename
assert(!parent_path.empty());
return fmt::format("{}/{}", parent_path, filename(file_stage));
}

bool operator==(const LogFilename & other) const
{
return log_num == other.log_num && level_num == other.level_num;
}
};

struct LogFilenameCmp
Expand All @@ -70,3 +76,18 @@ struct LogFilenameCmp
using LogFilenameSet = std::set<LogFilename, LogFilenameCmp>;

} // namespace DB::PS::V3

namespace std
{
template <>
struct hash<DB::PS::V3::LogFilename>
{
size_t operator()(const DB::PS::V3::LogFilename & name) const
{
size_t seed = 0;
boost::hash_combine(seed, boost::hash_value(name.log_num));
boost::hash_combine(seed, boost::hash_value(name.level_num));
return seed;
}
};
} // namespace std
18 changes: 9 additions & 9 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,7 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
watch.restart();
SCOPE_EXIT({ GET_METRIC(tiflash_storage_page_write_duration_seconds, type_commit).Observe(watch.elapsedSeconds()); });

SYNC_FOR("before_PageDirectory::apply_to_memory");
std::unordered_set<String> applied_data_files;
{
std::unique_lock table_lock(table_rw_mutex);
Expand Down Expand Up @@ -1816,22 +1817,21 @@ PageDirectory<Trait>::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_i
template <typename Trait>
bool PageDirectory<Trait>::tryDumpSnapshot(const WriteLimiterPtr & write_limiter, bool force)
{
auto identifier = fmt::format("{}.dump", wal->name());
auto snap = createSnapshot(identifier);

// Only apply compact logs when files snapshot is valid
auto files_snap = wal->tryGetFilesSnapshot(max_persisted_log_files, force);
auto files_snap = wal->tryGetFilesSnapshot(
max_persisted_log_files,
snap->sequence,
details::getMaxSequenceForRecord<Trait>,
force);
if (!files_snap.isValid())
return false;

// To prevent writes from affecting dumping snapshot (and vice versa), old log files
// are read from disk and a temporary PageDirectory is generated for dumping snapshot.
// The main reason write affect dumping snapshot is that we can not get a read-only
// `being_ref_count` by the function `createSnapshot()`.
assert(!files_snap.persisted_log_files.empty()); // should not be empty
auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
auto identifier = fmt::format("{}.dump_{}", wal->name(), log_num);

Stopwatch watch;
// The records persisted in `files_snap` is older than or equal to all records in `edit`
auto snap = createSnapshot(identifier);
auto edit = dumpSnapshotToEdit(snap);
files_snap.num_records = edit.size();
files_snap.dump_elapsed_ms = watch.elapsedMilliseconds();
Expand Down
26 changes: 24 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,18 @@ class MultiVersionRefCount
new_versioned_ref_counts->emplace_back(ver, ref_count_delta);
}
}
if (ref_count_delta_in_snap == 0 && new_versioned_ref_counts->empty())
if (ref_count_delta_in_snap == 0)
{
versioned_ref_counts = nullptr;
if (new_versioned_ref_counts->empty())
{
versioned_ref_counts = nullptr;
}
else
{
// There could be some new ref count created after `snap_seq`, we need to
// keep the newly added ref counts
versioned_ref_counts.swap(new_versioned_ref_counts);
}
return;
}
RUNTIME_CHECK(ref_count_delta_in_snap > 0, deref_count_delta, ref_count_delta_in_snap);
Expand Down Expand Up @@ -623,6 +632,19 @@ class PageDirectory
LoggerPtr log;
};


namespace details
{
template <typename Trait>
UInt64 getMaxSequenceForRecord(const String & record)
{
auto edit = Trait::Serializer::deserializeFrom(record, nullptr);
const auto & records = edit.getRecords();
RUNTIME_CHECK(!records.empty());
return records.back().version.sequence;
}
} // namespace details

namespace u128
{
struct PageDirectoryTrait
Expand Down
77 changes: 52 additions & 25 deletions dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,56 @@ WALStoreReaderPtr WALStoreReader::create(
return create(std::move(storage_name), provider, std::move(log_files), recovery_mode_, read_limiter);
}

LogReaderPtr WALStoreReader::createLogReader(
const LogFilename & filename,
FileProviderPtr & provider,
ReportCollector * reporter,
WALRecoveryMode recovery_mode,
const ReadLimiterPtr & read_limiter,
LoggerPtr logger)
{
const auto log_num = filename.log_num;
const auto fullname = filename.fullname(filename.stage);
LOG_DEBUG(logger, "Open log file for reading [file={}]", fullname);

auto read_buf = createReadBufferFromFileBaseByFileProvider(
provider,
fullname,
EncryptionPath{fullname, ""},
/*estimated_size*/ Format::BLOCK_SIZE,
/*aio_threshold*/ 0,
/*read_limiter*/ read_limiter,
/*buffer_size*/ Format::BLOCK_SIZE // Must be `Format::BLOCK_SIZE`
);
return std::make_unique<LogReader>(
std::move(read_buf),
reporter,
/*verify_checksum*/ true,
log_num,
recovery_mode);
}

String WALStoreReader::getLastRecordInLogFile(
const LogFilename & filename,
FileProviderPtr & provider,
WALRecoveryMode recovery_mode,
const ReadLimiterPtr & read_limiter,
LoggerPtr logger)
{
ReportCollector reporter;
auto log_reader = createLogReader(filename, provider, &reporter, recovery_mode, read_limiter, logger);
String last_record;
while (true)
{
auto [ok, record] = log_reader->readRecord();
if (!ok)
break;

last_record = std::move(record);
}
return last_record;
}

WALStoreReader::WALStoreReader(String storage_name,
FileProviderPtr & provider_,
std::optional<LogFilename> checkpoint,
Expand Down Expand Up @@ -209,37 +259,14 @@ bool WALStoreReader::openNextFile()
return false;
}

auto do_open = [this](const LogFilename & next_file) {
const auto log_num = next_file.log_num;
const auto filename = next_file.filename(next_file.stage);
const auto fullname = next_file.fullname(next_file.stage);
LOG_DEBUG(logger, "Open log file for reading [file={}]", fullname);

auto read_buf = createReadBufferFromFileBaseByFileProvider(
provider,
fullname,
EncryptionPath{fullname, ""},
/*estimated_size*/ Format::BLOCK_SIZE,
/*aio_threshold*/ 0,
/*read_limiter*/ read_limiter,
/*buffer_size*/ Format::BLOCK_SIZE // Must be `Format::BLOCK_SIZE`
);
reader = std::make_unique<LogReader>(
std::move(read_buf),
&reporter,
/*verify_checksum*/ true,
log_num,
recovery_mode);
};

if (!checkpoint_read_done)
{
do_open(*checkpoint_file);
reader = createLogReader(*checkpoint_file, provider, &reporter, recovery_mode, read_limiter, logger);
checkpoint_read_done = true;
}
else
{
do_open(*next_reading_file);
reader = createLogReader(*next_reading_file, provider, &reporter, recovery_mode, read_limiter, logger);
++next_reading_file;
}
return true;
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Storages/Page/V3/WAL/WALReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class ReportCollector : public LogReader::Reporter
bool error_happened = false;
};

using LogReaderPtr = std::unique_ptr<LogReader>;

class WALStoreReader
{
public:
Expand All @@ -67,6 +69,21 @@ class WALStoreReader
WALRecoveryMode recovery_mode_ = WALRecoveryMode::TolerateCorruptedTailRecords,
const ReadLimiterPtr & read_limiter = nullptr);

static LogReaderPtr createLogReader(
const LogFilename & filename,
FileProviderPtr & provider,
ReportCollector * reporter,
WALRecoveryMode recovery_mode,
const ReadLimiterPtr & read_limiter,
LoggerPtr logger);

static String getLastRecordInLogFile(
const LogFilename & filename,
FileProviderPtr & provider,
WALRecoveryMode recovery_mode,
const ReadLimiterPtr & read_limiter,
LoggerPtr logger);

bool remained() const;

std::optional<String> next();
Expand Down
57 changes: 46 additions & 11 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void WALStore::updateDiskUsage(const LogFilenameSet & log_filenames)
}
}

WALStore::FilesSnapshot WALStore::tryGetFilesSnapshot(size_t max_persisted_log_files, bool force)
WALStore::FilesSnapshot WALStore::tryGetFilesSnapshot(size_t max_persisted_log_files, UInt64 snap_sequence, std::function<UInt64(const String & record)> max_sequence_getter, bool force)
{
// First we simply check whether the number of files is enough for compaction
LogFilenameSet persisted_log_files = WALStoreReader::listAllFiles(delegator, logger);
Expand Down Expand Up @@ -192,15 +192,23 @@ WALStore::FilesSnapshot WALStore::tryGetFilesSnapshot(size_t max_persisted_log_f
log_file.reset();
}

for (auto iter = persisted_log_files.begin(); iter != persisted_log_files.end(); /*empty*/)
// traverse in reverse order,
// so that once the first log file whose max sequence is smaller or equal to snap_sequence is found,
// we don't need to check the max sequence for the rest log files.
bool found_log_file_smaller_than_snap_sequence = false;
LogFilenameSet snap_log_files;
for (auto iter = persisted_log_files.rbegin(); iter != persisted_log_files.rend(); ++iter) // NOLINT
{
if (iter->log_num >= current_writing_log_num)
iter = persisted_log_files.erase(iter);
else
++iter;
continue;
if (!found_log_file_smaller_than_snap_sequence && getLogFileMaxSequence(*iter, max_sequence_getter) > snap_sequence)
continue;

found_log_file_smaller_than_snap_sequence = true;
snap_log_files.emplace(*iter);
}
return WALStore::FilesSnapshot{
.persisted_log_files = std::move(persisted_log_files),
.persisted_log_files = std::move(snap_log_files),
};
}

Expand Down Expand Up @@ -242,11 +250,7 @@ bool WALStore::saveSnapshot(
LOG_INFO(logger, "Rename log file to normal done [tempname={}] [fullname={}]", temp_fullname, normal_fullname);

// Remove compacted log files.
for (const auto & filename : files_snap.persisted_log_files)
{
const auto log_fullname = filename.fullname(LogFileStage::Normal);
provider->deleteRegularFile(log_fullname, EncryptionPath(log_fullname, ""));
}
removeLogFiles(files_snap.persisted_log_files);

auto get_logging_str = [&]() {
FmtBuffer fmt_buf;
Expand All @@ -270,4 +274,35 @@ bool WALStore::saveSnapshot(
return true;
}

void WALStore::removeLogFiles(const LogFilenameSet & log_filenames)
{
for (const auto & filename : log_filenames)
{
const auto log_fullname = filename.fullname(LogFileStage::Normal);
provider->deleteRegularFile(log_fullname, EncryptionPath(log_fullname, ""));
}
{
std::unique_lock<std::mutex> lock(log_file_max_sequences_cache_mutex);
for (const auto & filename : log_filenames)
{
log_file_max_sequences_cache.erase(filename);
}
}
}

UInt64 WALStore::getLogFileMaxSequence(const LogFilename & log_filename, std::function<UInt64(const String & record)> max_sequence_getter)
{
std::unique_lock<std::mutex> lock(log_file_max_sequences_cache_mutex);
auto iter = log_file_max_sequences_cache.find(log_filename);
if (iter != log_file_max_sequences_cache.end())
return iter->second;

auto last_record = WALStoreReader::getLastRecordInLogFile(log_filename, provider, config.getRecoverMode(), /*read_limiter*/ nullptr, logger);
if (last_record.empty())
return 0; // empty log file

UInt64 max_sequence = max_sequence_getter(last_record);
log_file_max_sequences_cache.emplace(log_filename, max_sequence);
return max_sequence;
}
} // namespace DB::PS::V3
9 changes: 8 additions & 1 deletion dbms/src/Storages/Page/V3/WALStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class WALStore
}
};

FilesSnapshot tryGetFilesSnapshot(size_t max_persisted_log_files, bool force);
FilesSnapshot tryGetFilesSnapshot(size_t max_persisted_log_files, UInt64 snap_sequence, std::function<UInt64(const String & record)> max_sequence_getter, bool force);

bool saveSnapshot(
FilesSnapshot && files_snap,
Expand All @@ -119,6 +119,10 @@ class WALStore

void updateDiskUsage(const LogFilenameSet & log_filenames);

void removeLogFiles(const LogFilenameSet & log_filenames);

UInt64 getLogFileMaxSequence(const LogFilename & log_filename, std::function<UInt64(const String & record)> max_sequence_getter);

private:
const String storage_name;
PSDiskDelegatorPtr delegator;
Expand All @@ -129,6 +133,9 @@ class WALStore
UInt32 wal_paths_index;
std::unique_ptr<LogWriter> log_file;

mutable std::mutex log_file_max_sequences_cache_mutex;
std::unordered_map<LogFilename, UInt64> log_file_max_sequences_cache;

// Cached values when `tryGetFilesSnapshot` is called
mutable std::mutex mtx_disk_usage;
size_t num_log_files;
Expand Down
Loading

0 comments on commit 892c2ca

Please sign in to comment.