Skip to content

Commit

Permalink
Fix deadlock on removeExpiredSnapshots (pingcap#2461) (pingcap#2567)
Browse files Browse the repository at this point in the history
* cherry pick pingcap#2461 to release-4.0

Co-authored-by: JaySon <[email protected]>
Co-authored-by: JaySon-Huang <[email protected]>
  • Loading branch information
3 people authored Aug 10, 2021
1 parent 8df4323 commit 20238d3
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 26 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
M(DictCacheRequests) \
M(Revision) \
M(PSMVCCNumSnapshots) \
M(PSMVCCSnapshotsList) \
M(RWLockWaitingReaders) \
M(RWLockWaitingWriters) \
M(RWLockActiveReaders) \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
#include <Common/CurrentMetrics.h>
#include <Common/FailPoint.h>
#include <Storages/Page/VersionSet/PageEntriesVersionSet.h>
#include <Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.h>

#include <stack>

#ifdef FIU_ENABLE
#include <Common/randomSeed.h>

#include <pcg_random.hpp>
#include <thread>
#endif


namespace CurrentMetrics
{
extern const Metric PSMVCCSnapshotsList;
} // namespace CurrentMetrics

namespace DB
{
namespace FailPoints
{
extern const char random_slow_page_storage_list_all_live_files[];
} // namespace FailPoints

//==========================================================================================
// PageEntriesVersionSetWithDelta
Expand Down Expand Up @@ -42,9 +61,11 @@ std::pair<std::set<PageFileIdAndLevel>, std::set<PageId>>
PageEntriesVersionSetWithDelta::listAllLiveFiles(std::unique_lock<std::shared_mutex> && lock, bool need_scan_page_ids)
{
/// Collect live files is costly, we save SnapshotPtrs and scan them without lock.
(void)lock; // Note read_write_mutex must be hold.
// Note read_write_mutex must be hold.
std::vector<SnapshotPtr> valid_snapshots;
const size_t snapshots_size_before_clean = snapshots.size();
const size_t snapshots_size_before_clean = snapshots.size();
double longest_living_seconds = 0.0;
unsigned longest_living_from_thread_id = 0;
for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */)
{
auto snapshot_or_invalid = iter->lock();
Expand All @@ -55,23 +76,40 @@ PageEntriesVersionSetWithDelta::listAllLiveFiles(std::unique_lock<std::shared_mu
}
else
{
// Save valid snapshot.
valid_snapshots.emplace_back(snapshot_or_invalid);
fiu_do_on(FailPoints::random_slow_page_storage_list_all_live_files, {
pcg64 rng(randomSeed());
std::chrono::milliseconds ms{std::uniform_int_distribution(0, 900)(rng)}; // 0~900 milliseconds
std::this_thread::sleep_for(ms);
});
const auto snapshot_lifetime = snapshot_or_invalid->elapsedSeconds();
if (snapshot_lifetime > longest_living_seconds)
{
longest_living_seconds = snapshot_lifetime;
longest_living_from_thread_id = snapshot_or_invalid->t_id;
}
valid_snapshots.emplace_back(snapshot_or_invalid); // Save valid snapshot and release them without lock later
iter++;
}
}
// Create a temporary latest snapshot by using `current`
valid_snapshots.emplace_back(std::make_shared<Snapshot>(this, current));

lock.unlock(); // Notice: unlock
lock.unlock(); // Notice: unlock and we should free those valid snapshots without locking

// Plus 1 for eliminating the counting of temporary snapshot of `current`
const size_t num_invalid_snapshot_to_clean = snapshots_size_before_clean + 1 - valid_snapshots.size();
if (num_invalid_snapshot_to_clean > 0)
{
LOG_DEBUG(log,
name << " gcApply remove " + DB::toString(snapshots_size_before_clean + 1 - valid_snapshots.size())
+ " invalid snapshots.");
CurrentMetrics::sub(CurrentMetrics::PSMVCCSnapshotsList, num_invalid_snapshot_to_clean);
std::stringstream ss;
ss << name << " gcApply remove " << num_invalid_snapshot_to_clean << " invalid snapshots, " << valid_snapshots.size()
<< " snapshots left, longest lifetime " << DB::toString(longest_living_seconds, 3) << " seconds, created from thread_id "
<< longest_living_from_thread_id;
constexpr double EXIST_STALE_SNAPSHOT = 60.0;
if (longest_living_seconds > EXIST_STALE_SNAPSHOT)
LOG_WARNING(log, ss.str());
else
LOG_DEBUG(log, ss.str());
}
// Iterate all snapshots to collect all PageFile in used.
std::set<PageFileIdAndLevel> live_files;
Expand Down
104 changes: 86 additions & 18 deletions dbms/src/Storages/Page/mvcc/VersionSetWithDelta.h
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
#pragma once

#include <Common/CurrentMetrics.h>
#include <Common/FailPoint.h>
#include <Common/ProfileEvents.h>
#include <IO/WriteHelpers.h>
#include <Poco/Ext/ThreadNumber.h>
#include <Storages/Page/mvcc/VersionSet.h>
#include <stdint.h>

#include <boost/core/noncopyable.hpp>
#include <cassert>
#include <chrono>
#include <list>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <stack>
#include <unordered_set>

#ifdef FIU_ENABLE
#include <Common/randomSeed.h>

#include <pcg_random.hpp>
#include <thread>
#endif

namespace ProfileEvents
{
extern const Event PSMVCCCompactOnDelta;
Expand All @@ -28,10 +38,17 @@ extern const Event PSMVCCApplyOnNewDelta;
namespace CurrentMetrics
{
extern const Metric PSMVCCNumSnapshots;
extern const Metric PSMVCCSnapshotsList;
} // namespace CurrentMetrics

namespace DB
{

namespace FailPoints
{
extern const char random_slow_page_storage_remove_expired_snapshots[];
} // namespace FailPoints

namespace MVCC
{
/// Base type for VersionType of VersionSetWithDelta
Expand All @@ -48,7 +65,7 @@ struct MultiVersionCountableForDelta
};

// TODO: Merge `VersionSetWithDelta` with `PageEntriesVersionSetWithDelta`, template make things
// more complicated and hard to understand.
// more complicated and hard to understand.
//
/// \tparam TVersion -- Single version on version-list. Require for a `prev` member, see `MultiVersionDeltaCountable`
/// \tparam TVersionView -- A view to see a list of versions as a single version
Expand Down Expand Up @@ -80,8 +97,7 @@ class VersionSetWithDelta
{
current.reset();

std::unique_lock lock(read_write_mutex);
removeExpiredSnapshots(lock);
removeExpiredSnapshots();

// snapshot list is empty
assert(snapshots.empty());
Expand Down Expand Up @@ -126,8 +142,15 @@ class VersionSetWithDelta
VersionSetWithDelta * vset;
TVersionView view;

using TimePoint = std::chrono::time_point<std::chrono::steady_clock>;
const unsigned t_id;

private:
const TimePoint create_time;

public:
Snapshot(VersionSetWithDelta * vset_, VersionPtr tail_) : vset(vset_), view(std::move(tail_))
Snapshot(VersionSetWithDelta * vset_, VersionPtr tail_)
: vset(vset_), view(std::move(tail_)), t_id(Poco::ThreadNumber::get()), create_time(std::chrono::steady_clock::now())
{
CurrentMetrics::add(CurrentMetrics::PSMVCCNumSnapshots);
}
Expand All @@ -145,6 +168,14 @@ class VersionSetWithDelta

const TVersionView * version() const { return &view; }

// The time this snapshot living for
double elapsedSeconds() const
{
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> diff = end - create_time;
return diff.count();
}

template <typename V, typename VV, typename VE, typename B>
friend class VersionSetWithDelta;
};
Expand All @@ -165,6 +196,7 @@ class VersionSetWithDelta
// Do not call `vset->removeExpiredSnapshots` inside `~Snapshot`, or it may cause incursive deadlock
// on `vset->read_write_mutex`.
snapshots.emplace_back(SnapshotWeakPtr(s));
CurrentMetrics::add(CurrentMetrics::PSMVCCSnapshotsList);
return s;
}

Expand Down Expand Up @@ -291,20 +323,55 @@ class VersionSetWithDelta
}

private:
void removeExpiredSnapshots(const std::unique_lock<std::shared_mutex> &) const
// Scan over all `snapshots`, remove the invalid snapshots and get some statistics
// of all living snapshots and the oldest living snapshot.
// Return < num of snapshots,
// living time(seconds) of the oldest snapshot,
// created thread id of the oldest snapshot >
std::tuple<size_t, double, unsigned> removeExpiredSnapshots() const
{
for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */)
// Notice: we should free those valid snapshots without locking, or it may cause
// incursive deadlock on `vset->read_write_mutex`.
std::vector<SnapshotPtr> valid_snapshots;
double longest_living_seconds = 0.0;
unsigned longest_living_from_thread_id = 0;
DB::Int64 num_snapshots_removed = 0;
{
if (iter->expired())
std::unique_lock lock(read_write_mutex);
for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */)
{
// Clear free snapshots
iter = snapshots.erase(iter);
auto snapshot_or_invalid = iter->lock();
if (snapshot_or_invalid == nullptr)
{
// Clear expired snapshots weak_ptrs
iter = snapshots.erase(iter);
num_snapshots_removed += 1;
}
else
{
fiu_do_on(FailPoints::random_slow_page_storage_remove_expired_snapshots, {
pcg64 rng(randomSeed());
std::chrono::milliseconds ms{std::uniform_int_distribution(0, 900)(rng)}; // 0~900 milliseconds
std::this_thread::sleep_for(ms);
});
const auto snapshot_lifetime = snapshot_or_invalid->elapsedSeconds();
if (snapshot_lifetime > longest_living_seconds)
{
longest_living_seconds = snapshot_lifetime;
longest_living_from_thread_id = snapshot_or_invalid->t_id;
}
valid_snapshots.emplace_back(snapshot_or_invalid); // Save valid snapshot and release them without lock later
iter++;
}
}
else
{
iter++;
}
}
} // unlock `read_write_mutex`

const size_t num_valid_snapshots = valid_snapshots.size();
valid_snapshots.clear();

CurrentMetrics::sub(CurrentMetrics::PSMVCCSnapshotsList, num_snapshots_removed);
// Return some statistics of the oldest living snapshot.
return {num_valid_snapshots, longest_living_seconds, longest_living_from_thread_id};
}

public:
Expand All @@ -328,10 +395,11 @@ class VersionSetWithDelta

size_t numSnapshots() const
{
// Note: this will scan and remove expired weak_ptr to snapshot
std::unique_lock lock(read_write_mutex);
removeExpiredSnapshots(lock);
return snapshots.size();
// Note: this will scan and remove expired weak_ptrs from `snapshots`
size_t num_snapshots = 0;

std::tie(num_snapshots, std::ignore, std::ignore) = removeExpiredSnapshots();
return num_snapshots;
}

std::string toDebugString() const
Expand Down

0 comments on commit 20238d3

Please sign in to comment.