Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UniPS: Provide more information when meets exception in updateLocalCacheForRemotePage #9311

Merged
merged 13 commits into from
Aug 12, 2024
77 changes: 52 additions & 25 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ namespace PS::V3
template <typename Trait>
PageLock VersionedPageEntries<Trait>::acquireLock() const NO_THREAD_SAFETY_ANALYSIS
{
return std::lock_guard(m);
return std::lock_guard{m};
}

template <typename Trait>
Expand Down Expand Up @@ -337,7 +337,21 @@ bool VersionedPageEntries<Trait>::updateLocalCacheForRemotePage(const PageVersio
if (type == EditRecordType::VAR_ENTRY)
{
auto last_iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1, 0));
RUNTIME_CHECK_MSG(last_iter != entries.end() && last_iter->second.isEntry(), "{}", toDebugString());
if unlikely (last_iter == entries.end() || !last_iter->second.isEntry())
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
{
FmtBuffer buf;
for (const auto & e : entries)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also have a question: what's the size of entries normally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my case, the size is 2

{
buf.fmtAppend("{}|", e);
}
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"this={}, entries={}, ver={}, entry={}",
toDebugString(),
buf.toString(),
ver,
entry);
}
auto & ori_entry = last_iter->second.entry.value();
RUNTIME_CHECK_MSG(ori_entry.checkpoint_info.has_value(), "{}", toDebugString());
if (!ori_entry.checkpoint_info.is_local_data_reclaimed)
Expand Down Expand Up @@ -1797,35 +1811,48 @@ typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::updateLocalCach

for (const auto & r : edit.getRecords())
{
auto id_to_resolve = r.page_id;
auto sequence_to_resolve = seq;
while (true)
try
{
auto iter = mvcc_table_directory.lower_bound(id_to_resolve);
assert(iter != mvcc_table_directory.end());
auto & version_list = iter->second;
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = version_list->resolveToPageId(
sequence_to_resolve,
/*ignore_delete=*/id_to_resolve != r.page_id,
nullptr);
if (resolve_state == ResolveResult::TO_NORMAL)
auto id_to_resolve = r.page_id;
auto sequence_to_resolve = seq;
while (true)
{
if (!version_list->updateLocalCacheForRemotePage(PageVersion(sequence_to_resolve, 0), r.entry))
auto iter = mvcc_table_directory.lower_bound(id_to_resolve);
assert(iter != mvcc_table_directory.end());
auto & version_list = iter->second;
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = version_list->resolveToPageId(
sequence_to_resolve,
/*ignore_delete=*/id_to_resolve != r.page_id,
nullptr);
if (resolve_state == ResolveResult::TO_NORMAL)
{
ignored_entries.push_back(r.entry);
if (!version_list->updateLocalCacheForRemotePage(PageVersion(sequence_to_resolve, 0), r.entry))
{
ignored_entries.push_back(r.entry);
}
break;
}
else if (resolve_state == ResolveResult::TO_REF)
{
id_to_resolve = next_id_to_resolve;
sequence_to_resolve = next_ver_to_resolve.sequence;
}
else
{
RUNTIME_CHECK(false);
}
break;
}
else if (resolve_state == ResolveResult::TO_REF)
{
id_to_resolve = next_id_to_resolve;
sequence_to_resolve = next_ver_to_resolve.sequence;
}
else
{
RUNTIME_CHECK(false);
}
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format(
" type={}, page_id={}, ver={}, seq={}",
magic_enum::enum_name(r.type),
r.page_id,
r.version,
seq));
throw e;
}
}
}
return ignored_entries;
Expand Down