Skip to content

Commit

Permalink
UniPS: Provide more information when meets exception in updateLocalCa…
Browse files Browse the repository at this point in the history
…cheForRemotePage (pingcap#9311) (pingcap#265)

Signed-off-by: Calvin Neo <[email protected]>
Co-authored-by: JaySon-Huang <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 13, 2024
1 parent c8cdd39 commit 3835e2a
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 53 deletions.
23 changes: 12 additions & 11 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,18 @@ namespace DB
M(delta_tree_create_node_fail) \
M(disable_flush_cache)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
M(pause_before_apply_raft_cmd) \
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(pause_after_copr_streams_acquired_once) \
M(pause_before_register_non_root_mpp_task) \
M(pause_before_make_non_root_mpp_task_active)
#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
M(pause_before_apply_raft_cmd) \
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(pause_after_copr_streams_acquired_once) \
M(pause_before_register_non_root_mpp_task) \
M(pause_before_make_non_root_mpp_task_active) \
M(pause_before_page_dir_update_local_cache)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
Expand Down
79 changes: 55 additions & 24 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ namespace FailPoints
{
extern const char random_slow_page_storage_remove_expired_snapshots[];
extern const char pause_before_full_gc_prepare[];
extern const char pause_before_page_dir_update_local_cache[];
} // namespace FailPoints

namespace ErrorCodes
Expand Down Expand Up @@ -328,13 +329,24 @@ bool VersionedPageEntries<Trait>::updateLocalCacheForRemotePage(const PageVersio
if (type == EditRecordType::VAR_ENTRY)
{
auto last_iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1, 0));
RUNTIME_CHECK_MSG(last_iter != entries.end() && last_iter->second.isEntry(), "{}", toDebugString());
RUNTIME_CHECK_MSG(
last_iter != entries.end() && last_iter->second.isEntry(),
"this={}, entries={}, ver={}, entry={}",
toDebugString(),
entries,
ver,
entry);

auto & ori_entry = last_iter->second.entry.value();
RUNTIME_CHECK_MSG(ori_entry.checkpoint_info.has_value(), "{}", toDebugString());

// maybe another thread has in-place update the local blob location, ignored
if (!ori_entry.checkpoint_info.is_local_data_reclaimed)
{
return false;
}

// update the entry in-place
ori_entry.file_id = entry.file_id;
ori_entry.size = entry.size;
ori_entry.offset = entry.offset;
Expand Down Expand Up @@ -1748,6 +1760,7 @@ typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::updateLocalCach
const DB::PageStorageSnapshotPtr & snap_,
const WriteLimiterPtr & write_limiter)
{
FAIL_POINT_PAUSE(FailPoints::pause_before_page_dir_update_local_cache);
std::unique_lock apply_lock(apply_mutex);
auto seq = toConcreteSnapshot(snap_)->sequence;
for (auto & r : edit.getMutRecords())
Expand All @@ -1761,35 +1774,53 @@ typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::updateLocalCach

for (const auto & r : edit.getRecords())
{
auto id_to_resolve = r.page_id;
auto sequence_to_resolve = seq;
while (true)
try
{
auto iter = mvcc_table_directory.lower_bound(id_to_resolve);
assert(iter != mvcc_table_directory.end());
auto & version_list = iter->second;
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = version_list->resolveToPageId(
sequence_to_resolve,
/*ignore_delete=*/id_to_resolve != r.page_id,
nullptr);
if (resolve_state == ResolveResult::TO_NORMAL)
auto id_to_resolve = r.page_id;
auto sequence_to_resolve = seq;
while (true)
{
if (!version_list->updateLocalCacheForRemotePage(PageVersion(sequence_to_resolve, 0), r.entry))
auto iter = mvcc_table_directory.lower_bound(id_to_resolve);
assert(iter != mvcc_table_directory.end());
auto & version_list = iter->second;
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = version_list->resolveToPageId(
sequence_to_resolve,
/*ignore_delete=*/id_to_resolve != r.page_id,
nullptr);
if (resolve_state == ResolveResult::TO_NORMAL)
{
ignored_entries.push_back(r.entry);
if (!version_list->updateLocalCacheForRemotePage(PageVersion(sequence_to_resolve, 0), r.entry))
{
// The entry is not valid for updating the version_list.
// Caller should notice these part of "ignored_entries" and release
// the space allocated for these invalid entries.
// For the information persisted in WAL, it should be ignored when
// restoring from disk.
ignored_entries.push_back(r.entry);
}
break;
}
else if (resolve_state == ResolveResult::TO_REF)
{
id_to_resolve = next_id_to_resolve;
sequence_to_resolve = next_ver_to_resolve.sequence;
}
else
{
RUNTIME_CHECK(false);
}
break;
}
else if (resolve_state == ResolveResult::TO_REF)
{
id_to_resolve = next_id_to_resolve;
sequence_to_resolve = next_ver_to_resolve.sequence;
}
else
{
RUNTIME_CHECK(false);
}
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format(
" type={}, page_id={}, ver={}, seq={}",
magic_enum::enum_name(r.type),
r.page_id,
r.version,
seq));
throw e;
}
}
}
return ignored_entries;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,14 @@ void UniversalPageStorage::unregisterUniversalExternalPagesCallbacks(const Strin

void UniversalPageStorage::tryUpdateLocalCacheForRemotePages(UniversalWriteBatch & wb, SnapshotPtr snapshot) const
{
// store the downloaded page data into local cache and generate new "edit"
auto edit = blob_store->write(std::move(wb));
// Update the entries to the location of BlobFile.
auto ignored_entries = page_directory->updateLocalCacheForRemotePages(std::move(edit), snapshot);
if (!ignored_entries.empty())
{
// Some entries are not valid for updating the page_directory. BlobStore should
// release the space for new blob data.
blob_store->remove(ignored_entries);
}
}
Expand Down
Loading

0 comments on commit 3835e2a

Please sign in to comment.