Skip to content

Commit

Permalink
Fix some bugs on PageDirectory
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Mar 14, 2022
1 parent 1479d4a commit e19947b
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 26 deletions.
29 changes: 11 additions & 18 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ PageSize VersionedPageEntries::getEntriesByBlobIds(

bool VersionedPageEntries::cleanOutdatedEntries(
UInt64 lowest_seq,
PageIdV3Internal page_id,
std::map<PageIdV3Internal, std::pair<PageVersionType, Int64>> * normal_entries_to_deref,
PageEntriesV3 & entries_removed,
const PageLock & /*page_lock*/)
Expand All @@ -419,15 +418,6 @@ bool VersionedPageEntries::cleanOutdatedEntries(
// need to decrease the ref count by <id=iter->second.origin_page_id, ver=iter->first, num=1>
if (auto [deref_counter, new_created] = normal_entries_to_deref->emplace(std::make_pair(ori_page_id, std::make_pair(/*ver=*/create_ver, /*count=*/1))); !new_created)
{
if (deref_counter->second.first.sequence != create_ver.sequence)
{
throw Exception(fmt::format(
"There exist two different version of ref, should not happen [page_id={}] [ori_page_id={}] [ver={}] [another_ver={}]",
page_id,
ori_page_id,
create_ver,
deref_counter->second.first));
}
// the id is already exist in deref map, increase the num to decrease ref count
deref_counter->second.second += 1;
}
Expand Down Expand Up @@ -511,7 +501,8 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag
}
else if (type == EditRecordType::VAR_ENTRY)
{
// decrease the ref-counter
// Decrease the ref-counter. The entry may be moved to a newer entry with same sequence but higher epoch,
// so we need to find the one less than <seq+1, 0> and decrease the ref-counter of it.
auto iter = MapUtils::findMutLess(entries, PageVersionType(deref_ver.sequence + 1, 0));
if (iter == entries.end())
{
Expand All @@ -533,7 +524,7 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag

// Clean outdated entries after decreased the ref-counter
// set `normal_entries_to_deref` to be nullptr to ignore cleaning ref-var-entries
return cleanOutdatedEntries(lowest_seq, page_id, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock);
return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock);
}

throw Exception(fmt::format("calling derefAndClean with invalid state [state={}]", toDebugString()));
Expand Down Expand Up @@ -866,7 +857,10 @@ void PageDirectory::applyRefEditRecord(
const VersionedPageEntriesPtr & resolve_version_list = resolve_ver_iter->second;
// If we already hold the lock from `id_to_resolve`, then we should not request it again.
// This can happen when `id_to_resolve` have other operating in current writebatch
auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = resolve_version_list->resolveToPageId(ver_to_resolve.sequence, false, nullptr);
auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = resolve_version_list->resolveToPageId(
ver_to_resolve.sequence,
/*check_prev=*/true,
nullptr);
switch (need_collapse)
{
case VersionedPageEntries::RESOLVE_FAIL:
Expand Down Expand Up @@ -969,7 +963,7 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write
case EditRecordType::VAR_ENTRY:
case EditRecordType::VAR_EXTERNAL:
case EditRecordType::VAR_REF:
throw Exception(fmt::format("should not handle {} edit", r.type));
throw Exception(fmt::format("should not handle edit with invalid type [type={}]", r.type));
}
}
catch (DB::Exception & e)
Expand Down Expand Up @@ -1003,7 +997,7 @@ void PageDirectory::gcApply(PageEntriesEdit && migrated_edit, const WriteLimiter
iter = mvcc_table_directory.find(record.page_id);
if (unlikely(iter == mvcc_table_directory.end()))
{
throw Exception(fmt::format("Can't found [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Can't find [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR);
}
} // release the read lock on `table_rw_mutex`

Expand Down Expand Up @@ -1129,7 +1123,6 @@ PageEntriesV3 PageDirectory::gcInMemEntries()
// do gc on the version list without lock on `mvcc_table_directory`.
const bool all_deleted = iter->second->cleanOutdatedEntries(
lowest_seq,
/*page_id=*/iter->first,
&normal_entries_to_deref,
all_del_entries,
iter->second->acquireLock());
Expand Down Expand Up @@ -1164,8 +1157,8 @@ PageEntriesV3 PageDirectory::gcInMemEntries()
const bool all_deleted = iter->second->derefAndClean(
lowest_seq,
page_id,
deref_counter.first,
deref_counter.second,
/*deref_ver=*/deref_counter.first,
/*deref_count=*/deref_counter.second,
all_del_entries);

if (all_deleted)
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Common/CurrentMetrics.h>
#include <Common/LogWithPrefix.h>
#include <Encryption/FileProvider.h>
#include <Poco/Ext/ThreadNumber.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/Snapshot.h>
Expand All @@ -17,8 +18,6 @@
#include <shared_mutex>
#include <unordered_map>

#include "Encryption/FileProvider.h"

namespace CurrentMetrics
{
extern const Metric PSMVCCNumSnapshots;
Expand Down Expand Up @@ -205,7 +204,6 @@ class VersionedPageEntries
*/
bool cleanOutdatedEntries(
UInt64 lowest_seq,
PageIdV3Internal page_id,
std::map<PageIdV3Internal, std::pair<PageVersionType, Int64>> * normal_entries_to_deref,
PageEntriesV3 & entries_removed,
const PageLock & page_lock);
Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,16 +613,12 @@ CATCH
class VersionedEntriesTest : public ::testing::Test
{
public:
void SetUp() override
{
}

using DerefCounter = std::map<PageIdV3Internal, std::pair<PageVersionType, Int64>>;
std::tuple<bool, PageEntriesV3, DerefCounter> runClean(UInt64 seq)
{
DerefCounter deref_counter;
PageEntriesV3 removed_entries;
bool all_removed = entries.cleanOutdatedEntries(seq, buildV3Id(TEST_NAMESPACE_ID, page_id), &deref_counter, removed_entries, entries.acquireLock());
bool all_removed = entries.cleanOutdatedEntries(seq, &deref_counter, removed_entries, entries.acquireLock());
return {all_removed, removed_entries, deref_counter};
}

Expand Down

0 comments on commit e19947b

Please sign in to comment.