Skip to content

Commit

Permalink
PageStorage: Fix some bugs (#4212)
Browse files Browse the repository at this point in the history
ref #3594
  • Loading branch information
JaySon-Huang authored Mar 14, 2022
1 parent eef6342 commit 3d75154
Show file tree
Hide file tree
Showing 11 changed files with 374 additions and 52 deletions.
46 changes: 21 additions & 25 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ void VersionedPageEntries::createNewEntry(const PageVersionType & ver, const Pag

if (type == EditRecordType::VAR_ENTRY)
{
auto last_iter = entries.rbegin();
if (last_iter->second.isDelete())
auto last_iter = MapUtils::findLess(entries, PageVersionType(ver.sequence + 1, 0));
if (last_iter == entries.end())
{
entries.emplace(ver, EntryOrDelete::newNormalEntry(entry));
}
else if (last_iter->second.isDelete())
{
entries.emplace(ver, EntryOrDelete::newNormalEntry(entry));
}
Expand Down Expand Up @@ -302,10 +306,8 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 *
}
else if (type == EditRecordType::VAR_EXTERNAL)
{
// If we applied write batches like this: [ver=1]{putExternal 10}, [ver=2]{ref 11->10, del 10}
// then by ver=2, we should not able to read 10, but able to read 11 (resolving 11 ref to 10).
// when resolving 11 to 10, we need to set `check_prev` to true
bool ok = !is_deleted || (is_deleted && (check_prev ? (seq <= delete_ver.sequence) : (seq < delete_ver.sequence)));
// We may add reference to an external id even if it is logically deleted.
bool ok = check_prev ? true : (!is_deleted || (is_deleted && seq < delete_ver.sequence));
if (create_ver.sequence <= seq && ok)
{
return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersionType(0)};
Expand Down Expand Up @@ -354,8 +356,9 @@ Int64 VersionedPageEntries::incrRefCount(const PageVersionType & ver)
}
else if (type == EditRecordType::VAR_EXTERNAL)
{
if (create_ver <= ver && (!is_deleted || (is_deleted && ver < delete_ver)))
if (create_ver <= ver)
{
// We may add reference to an external id even if it is logically deleted.
return ++being_ref_count;
}
}
Expand Down Expand Up @@ -397,7 +400,6 @@ PageSize VersionedPageEntries::getEntriesByBlobIds(

bool VersionedPageEntries::cleanOutdatedEntries(
UInt64 lowest_seq,
PageIdV3Internal page_id,
std::map<PageIdV3Internal, std::pair<PageVersionType, Int64>> * normal_entries_to_deref,
PageEntriesV3 & entries_removed,
const PageLock & /*page_lock*/)
Expand All @@ -416,15 +418,6 @@ bool VersionedPageEntries::cleanOutdatedEntries(
// need to decrease the ref count by <id=iter->second.origin_page_id, ver=iter->first, num=1>
if (auto [deref_counter, new_created] = normal_entries_to_deref->emplace(std::make_pair(ori_page_id, std::make_pair(/*ver=*/create_ver, /*count=*/1))); !new_created)
{
if (deref_counter->second.first.sequence != create_ver.sequence)
{
throw Exception(fmt::format(
"There exist two different version of ref, should not happen [page_id={}] [ori_page_id={}] [ver={}] [another_ver={}]",
page_id,
ori_page_id,
create_ver,
deref_counter->second.first));
}
// the id is already exist in deref map, increase the num to decrease ref count
deref_counter->second.second += 1;
}
Expand Down Expand Up @@ -508,7 +501,8 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag
}
else if (type == EditRecordType::VAR_ENTRY)
{
// decrease the ref-counter
// Decrease the ref-counter. The entry may be moved to a newer entry with same sequence but higher epoch,
// so we need to find the one less than <seq+1, 0> and decrease the ref-counter of it.
auto iter = MapUtils::findMutLess(entries, PageVersionType(deref_ver.sequence + 1, 0));
if (iter == entries.end())
{
Expand All @@ -530,7 +524,7 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag

// Clean outdated entries after decreased the ref-counter
// set `normal_entries_to_deref` to be nullptr to ignore cleaning ref-var-entries
return cleanOutdatedEntries(lowest_seq, page_id, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock);
return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock);
}

throw Exception(fmt::format("calling derefAndClean with invalid state [state={}]", toDebugString()));
Expand Down Expand Up @@ -863,7 +857,10 @@ void PageDirectory::applyRefEditRecord(
const VersionedPageEntriesPtr & resolve_version_list = resolve_ver_iter->second;
// If we already hold the lock from `id_to_resolve`, then we should not request it again.
// This can happen when `id_to_resolve` have other operating in current writebatch
auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = resolve_version_list->resolveToPageId(ver_to_resolve.sequence, false, nullptr);
auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = resolve_version_list->resolveToPageId(
ver_to_resolve.sequence,
/*check_prev=*/true,
nullptr);
switch (need_collapse)
{
case VersionedPageEntries::RESOLVE_FAIL:
Expand Down Expand Up @@ -966,7 +963,7 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write
case EditRecordType::VAR_ENTRY:
case EditRecordType::VAR_EXTERNAL:
case EditRecordType::VAR_REF:
throw Exception(fmt::format("should not handle {} edit", r.type));
throw Exception(fmt::format("should not handle edit with invalid type [type={}]", r.type));
}
}
catch (DB::Exception & e)
Expand Down Expand Up @@ -1000,7 +997,7 @@ void PageDirectory::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiter
iter = mvcc_table_directory.find(record.page_id);
if (unlikely(iter == mvcc_table_directory.end()))
{
throw Exception(fmt::format("Can't found [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Can't find [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR);
}
} // release the read lock on `table_rw_mutex`

Expand Down Expand Up @@ -1126,7 +1123,6 @@ PageEntriesV3 PageDirectory::gcInMemEntries()
// do gc on the version list without lock on `mvcc_table_directory`.
const bool all_deleted = iter->second->cleanOutdatedEntries(
lowest_seq,
/*page_id=*/iter->first,
&normal_entries_to_deref,
all_del_entries,
iter->second->acquireLock());
Expand Down Expand Up @@ -1161,8 +1157,8 @@ PageEntriesV3 PageDirectory::gcInMemEntries()
const bool all_deleted = iter->second->derefAndClean(
lowest_seq,
page_id,
deref_counter.first,
deref_counter.second,
/*deref_ver=*/deref_counter.first,
/*deref_count=*/deref_counter.second,
all_del_entries);

if (all_deleted)
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Common/CurrentMetrics.h>
#include <Common/LogWithPrefix.h>
#include <Encryption/FileProvider.h>
#include <Poco/Ext/ThreadNumber.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/Snapshot.h>
Expand All @@ -17,8 +18,6 @@
#include <shared_mutex>
#include <unordered_map>

#include "Encryption/FileProvider.h"

namespace CurrentMetrics
{
extern const Metric PSMVCCNumSnapshots;
Expand Down Expand Up @@ -205,7 +204,6 @@ class VersionedPageEntries
*/
bool cleanOutdatedEntries(
UInt64 lowest_seq,
PageIdV3Internal page_id,
std::map<PageIdV3Internal, std::pair<PageVersionType, Int64>> * normal_entries_to_deref,
PageEntriesV3 & entries_removed,
const PageLock & page_lock);
Expand Down
30 changes: 21 additions & 9 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,35 @@ PageDirectoryPtr PageDirectoryFactory::create(FileProviderPtr & file_provider, P
auto [wal, reader] = WALStore::create(file_provider, delegator);
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(wal));
loadFromDisk(dir, std::move(reader));
// TODO: After restored ends, set the last offset of log file for `wal`
if (blob_stats)
blob_stats->restore();
// Reset the `sequence` to the maximum of persisted.
dir->sequence = max_applied_ver.sequence;

if (blob_stats)
{
// After all entries restored to `mvcc_table_directory`, only apply
// the latest entry to `blob_stats`, or we may meet error since
// some entries may be removed in memory but not get compacted
// in the log file.
for (const auto & [page_id, entries] : dir->mvcc_table_directory)
{
(void)page_id;
if (auto entry = entries->getEntry(max_applied_ver.sequence); entry)
{
blob_stats->restoreByEntry(*entry);
}
}

blob_stats->restore();
}

// TODO: After restored ends, set the last offset of log file for `wal`
return dir;
}

PageDirectoryPtr PageDirectoryFactory::createFromEdit(FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit)
{
auto [wal, reader] = WALStore::create(file_provider, delegator);
(void)reader;
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(wal));
loadEdit(dir, edit);
if (blob_stats)
Expand Down Expand Up @@ -67,8 +85,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr
}
case EditRecordType::VAR_ENTRY:
version_list->fromRestored(r);
if (blob_stats)
blob_stats->restoreByEntry(r.entry);
break;
case EditRecordType::PUT_EXTERNAL:
{
Expand All @@ -82,8 +98,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr
}
case EditRecordType::PUT:
version_list->createNewEntry(restored_version, r.entry);
if (blob_stats)
blob_stats->restoreByEntry(r.entry);
break;
case EditRecordType::DEL:
case EditRecordType::VAR_DELETE: // nothing different from `DEL`
Expand All @@ -94,8 +108,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr
break;
case EditRecordType::UPSERT:
version_list->createNewEntry(restored_version, r.entry);
if (blob_stats)
blob_stats->restoreByEntry(r.entry);
break;
}
}
Expand Down
45 changes: 44 additions & 1 deletion dbms/src/Storages/Page/V3/PageEntriesEdit.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,31 @@ enum class EditRecordType
VAR_DELETE,
};

inline const char * typeToString(EditRecordType t)
{
switch (t)
{
case EditRecordType::PUT:
return "PUT ";
case EditRecordType::PUT_EXTERNAL:
return "EXT ";
case EditRecordType::REF:
return "REF ";
case EditRecordType::DEL:
return "DEL ";
case EditRecordType::UPSERT:
return "UPSERT ";
case EditRecordType::VAR_ENTRY:
return "VAR_ENT";
case EditRecordType::VAR_REF:
return "VAR_REF";
case EditRecordType::VAR_EXTERNAL:
return "VAR_EXT";
case EditRecordType::VAR_DELETE:
return "VAR_DEL";
}
}

/// Page entries change to apply to PageDirectory
class PageEntriesEdit
{
Expand Down Expand Up @@ -176,10 +201,28 @@ class PageEntriesEdit
PageIdV3Internal ori_page_id;
PageVersionType version;
PageEntryV3 entry;
Int64 being_ref_count = 1;
Int64 being_ref_count;

EditRecord()
: page_id(0)
, ori_page_id(0)
, being_ref_count(1)
{}
};
using EditRecords = std::vector<EditRecord>;

static String toDebugString(const EditRecord & rec)
{
return fmt::format(
"{{type:{}, page_id:{}, ori_id:{}, version:{}, entry:{}, being_ref_count:{}}}",
typeToString(rec.type),
rec.page_id,
rec.ori_page_id,
rec.version,
DB::PS::V3::toDebugString(rec.entry),
rec.being_ref_count);
}

void appendRecord(const EditRecord & rec)
{
records.emplace_back(rec);
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,18 @@ WALStoreReader::findCheckpoint(LogFilenameSet && all_files)
LogFilename latest_checkpoint = *latest_checkpoint_iter;
for (auto iter = all_files.cbegin(); iter != all_files.cend(); /*empty*/)
{
if (iter->log_num < latest_checkpoint.log_num)
// We use <largest_num, 1> as the checkpoint, so all files less than or equal
// to latest_checkpoint.log_num can be erase
if (iter->log_num <= latest_checkpoint.log_num)
{
// TODO: clean useless file that is older than `checkpoint`
if (iter->log_num == latest_checkpoint.log_num && iter->level_num != 0)
{
// the checkpoint file, not remove
}
else
{
// TODO: clean useless file that is older than `checkpoint`
}
iter = all_files.erase(iter);
}
else
Expand Down Expand Up @@ -186,6 +195,7 @@ bool WALStoreReader::openNextFile()
if (!checkpoint_read_done)
{
do_open(*checkpoint_file);
checkpoint_read_done = true;
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/WAL/serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void serializePutTo(const PageEntriesEdit::EditRecord & record, WriteBuffer & bu
{
assert(record.type == EditRecordType::PUT || record.type == EditRecordType::UPSERT || record.type == EditRecordType::VAR_ENTRY);

writeIntBinary(EditRecordType::PUT, buf);
writeIntBinary(record.type, buf);

UInt32 flags = 0;
writeIntBinary(flags, buf);
Expand Down
29 changes: 27 additions & 2 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
#include <Poco/Logger.h>
#include <Poco/Path.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V3/LogFile/LogFilename.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
Expand Down Expand Up @@ -114,10 +115,22 @@ std::tuple<std::unique_ptr<LogWriter>, LogFilename> WALStore::createLogWriter(

WALStore::FilesSnapshot WALStore::getFilesSnapshot() const
{
const auto current_writting_log_num = [this]() {
const auto [ok, current_writting_log_num] = [this]() -> std::tuple<bool, Format::LogNumberType> {
std::lock_guard lock(log_file_mutex);
return log_file->logNumber();
if (!log_file)
{
return {false, 0};
}
return {true, log_file->logNumber()};
}();
// Return empty set if `log_file` is not ready
if (!ok)
{
return WALStore::FilesSnapshot{
.current_writting_log_num = 0,
.persisted_log_files = {},
};
}

// Only those files are totally persisted
LogFilenameSet persisted_log_files = WALStoreReader::listAllFiles(delegator, logger);
Expand Down Expand Up @@ -163,12 +176,24 @@ bool WALStore::saveSnapshot(FilesSnapshot && files_snap, PageEntriesEdit && dire
LOG_FMT_INFO(logger, "Rename log file to normal done [fullname={}]", normal_fullname);
}

// #define ARCHIVE_COMPACTED_LOGS // keep for debug

// Remove compacted log files.
for (const auto & filename : files_snap.persisted_log_files)
{
if (auto f = Poco::File(filename.fullname(LogFileStage::Normal)); f.exists())
{
#ifndef ARCHIVE_COMPACTED_LOGS
f.remove();
#else
const Poco::Path archive_path(delegator->defaultPath(), "archive");
Poco::File archive_dir(archive_path);
if (!archive_dir.exists())
archive_dir.createDirectory();
auto dest = archive_path.toString() + "/" + filename.filename(LogFileStage::Normal);
f.moveTo(dest);
LOG_FMT_INFO(logger, "archive {} to {}", filename.fullname(LogFileStage::Normal), dest);
#endif
}
}
// TODO: Log more information. duration, num entries, size of compact log file...
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/tests/entries_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ inline ::testing::AssertionResult getEntriesCompare(
{
// not the expected entry we want
String err_msg;
auto expect_expr = fmt::format("Entry at {} [index={}]", idx);
auto expect_expr = fmt::format("Entry at {} [index={}]", idx, idx);
auto actual_expr = fmt::format("Get entries {} from {} with snap {} [index={}", page_ids_expr, dir_expr, snap_expr, idx);
return testing::internal::EqFailure(
expect_expr.c_str(),
Expand Down
Loading

0 comments on commit 3d75154

Please sign in to comment.