Skip to content

Commit

Permalink
[bug fix] Need to wait for previous recover task finished. (#3877)
Browse files Browse the repository at this point in the history
* check if there is existing recover task, before add prime

* address some comments from Panda

* fix a ut compile failed

Co-authored-by: Sophie <[email protected]>
  • Loading branch information
liuyu85cn and Sophie-Xie authored Mar 17, 2022
1 parent 43a447f commit 9dbe5b0
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 49 deletions.
4 changes: 4 additions & 0 deletions src/common/utils/MemoryLockCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class MemoryLockCore {
return hashMap_.size();
}

bool contains(const Key& key) {
return hashMap_.find(key) == hashMap_.end();
}

protected:
folly::ConcurrentHashMap<Key, int> hashMap_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/test/ChainTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TransactionManagerTester {
man_->stop();
int32_t numCheckIdle = 0;
while (numCheckIdle < 3) {
auto stats = man_->exec_->getPoolStats();
auto stats = man_->worker_->getPoolStats();
if (stats.threadCount == stats.idleThreadCount) {
++numCheckIdle;
} else {
Expand Down
112 changes: 67 additions & 45 deletions src/storage/transaction/TransactionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ DEFINE_int32(toss_worker_num, 16, "Resume interval");

TransactionManager::TransactionManager(StorageEnv* env) : env_(env) {
LOG(INFO) << "TransactionManager ctor()";
exec_ = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_toss_worker_num);
worker_ = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_toss_worker_num);
controller_ = std::make_shared<folly::IOThreadPoolExecutor>(1);
}

bool TransactionManager::start() {
Expand Down Expand Up @@ -72,14 +73,15 @@ void TransactionManager::stop() {

void TransactionManager::join() {
LOG(INFO) << "TransactionManager join()";
exec_->stop();
worker_->stop();
controller_->stop();
}

void TransactionManager::addChainTask(ChainBaseProcessor* proc) {
if (stop_) {
return;
}
folly::via(exec_.get())
folly::via(worker_.get())
.thenValue([=](auto&&) { return proc->prepareLocal(); })
.thenValue([=](auto&& code) { return proc->processRemote(code); })
.thenValue([=](auto&& code) { return proc->processLocal(code); })
Expand Down Expand Up @@ -192,13 +194,18 @@ void TransactionManager::onLeaderLostWrapper(const ::nebula::kvstore::Part::Call
opt.spaceId,
opt.partId,
opt.term);
auto currTermKey = std::make_pair(opt.spaceId, opt.partId);
auto currTermIter = currTerm_.find(currTermKey);
if (currTermIter == currTerm_.end()) {
return;
// clean some out-dated item in memory lock
for (auto cit = memLocks_.cbegin(); cit != memLocks_.cend();) {
auto& [spaceId, partId, termId] = cit->first;
if (spaceId == opt.spaceId && partId == opt.partId && termId < opt.term) {
auto sptrLockCore = cit->second;
if (sptrLockCore->size() == 0) {
cit = memLocks_.erase(cit);
continue;
}
}
++cit;
}
auto memLockKey = std::make_tuple(opt.spaceId, opt.partId, currTermIter->second);
memLocks_.erase(memLockKey);
}

void TransactionManager::onLeaderElectedWrapper(
Expand All @@ -211,54 +218,69 @@ void TransactionManager::onLeaderElectedWrapper(
void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId, TermID termId) {
LOG(INFO) << folly::sformat(
"{}(), space={}, part={}, term={}", __func__, spaceId, partId, termId);
std::vector<std::string> prefixVec{ConsistUtil::primePrefix(partId),
ConsistUtil::doublePrimePrefix(partId)};
std::vector<ResumeType> resumeVec{ResumeType::RESUME_CHAIN, ResumeType::RESUME_REMOTE};
auto termKey = std::make_pair(spaceId, partId);
auto itCurrTerm = currTerm_.find(termKey);
TermID prevTerm = itCurrTerm == currTerm_.end() ? -1 : itCurrTerm->second;

std::unique_ptr<kvstore::KVIterator> iter;
auto prefix = ConsistUtil::primePrefix(partId);
auto rc = env_->kvstore_->prefix(spaceId, partId, prefix, &iter);
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
for (; iter->valid(); iter->next()) {
auto edgeKey = ConsistUtil::edgeKeyFromPrime(iter->key()).str();
VLOG(1) << "scanned prime edge: " << folly::hexlify(edgeKey);
auto lk = getLockCore(spaceId, partId, termId, false);
auto succeed = lk->try_lock(edgeKey);
if (!succeed) {
LOG(ERROR) << "not supposed to lock fail: "
<< ", spaceId " << spaceId << ", partId " << partId << ", termId " << termId
<< folly::hexlify(edgeKey);
}
addPrime(spaceId, partId, termId, edgeKey, ResumeType::RESUME_CHAIN);
}
} else {
VLOG(1) << "primePrefix() " << apache::thrift::util::enumNameSafe(rc);
}
for (auto i = 0U; i != prefixVec.size(); ++i) {
auto rc = env_->kvstore_->prefix(spaceId, partId, prefixVec[i], &iter);
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
for (; iter->valid(); iter->next()) {
auto edgeKey = ConsistUtil::edgeKeyFromPrime(iter->key()).str();
VLOG(1) << "scanned prime edge: " << folly::hexlify(edgeKey);
auto lk = getLockCore(spaceId, partId, termId, false);
auto succeed = lk->try_lock(edgeKey);
if (!succeed) {
LOG(ERROR) << "not supposed to lock fail: "
<< ", spaceId " << spaceId << ", partId " << partId << ", termId " << termId
<< folly::hexlify(edgeKey);
}
auto prevMemLockKey = std::make_tuple(spaceId, partId, prevTerm);
auto prevMemLockIter = memLocks_.find(prevMemLockKey);
auto spLock = prevMemLockIter == memLocks_.end() ? nullptr : prevMemLockIter->second;
auto hasUnfinishedTask = [lk = spLock, key = edgeKey] {
return (lk != nullptr) && lk->contains(key);
};

prefix = ConsistUtil::doublePrimePrefix(partId);
rc = env_->kvstore_->prefix(spaceId, partId, prefix, &iter);
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
for (; iter->valid(); iter->next()) {
auto edgeKey = ConsistUtil::edgeKeyFromDoublePrime(iter->key()).str();
VLOG(1) << "scanned double prime edge: " << folly::hexlify(edgeKey);
auto lk = getLockCore(spaceId, partId, termId, false);
auto succeed = lk->try_lock(edgeKey);
if (!succeed) {
LOG(ERROR) << "not supposed to lock fail: "
<< ", space " << spaceId << ", partId " << partId << ", termId " << termId
<< folly::hexlify(edgeKey);
if (!hasUnfinishedTask()) {
addPrime(spaceId, partId, termId, edgeKey, resumeVec[i]);
} else {
folly::Promise<folly::Unit> pro;
auto fut = pro.getFuture();
std::move(fut).thenValue(
[=](auto&&) { addPrime(spaceId, partId, termId, edgeKey, resumeVec[i]); });
waitUntil(std::move(hasUnfinishedTask), std::move(pro));
}
}
addPrime(spaceId, partId, termId, edgeKey, ResumeType::RESUME_REMOTE);
} else {
VLOG(1) << "primePrefix() " << apache::thrift::util::enumNameSafe(rc);
}
} else {
VLOG(1) << "doublePrimePrefix() " << apache::thrift::util::enumNameSafe(rc);
}

auto currTermKey = std::make_pair(spaceId, partId);
currTerm_.insert_or_assign(currTermKey, termId);
currTerm_.insert_or_assign(termKey, termId);
prevTerms_.insert_or_assign(termKey, prevTerm);

LOG(INFO) << "set curr term spaceId = " << spaceId << ", partId = " << partId
<< ", termId = " << termId;
}

folly::EventBase* TransactionManager::getEventBase() {
return exec_->getEventBase();
return worker_->getEventBase();
}

void TransactionManager::waitUntil(std::function<bool()>&& cond, folly::Promise<folly::Unit>&& p) {
controller_->add([cond = std::move(cond), p = std::move(p), this]() mutable {
if (cond()) {
p.setValue(folly::Unit());
} else {
std::this_thread::sleep_for(std::chrono::seconds(3));
this->waitUntil(std::move(cond), std::move(p));
}
});
}

} // namespace storage
Expand Down
15 changes: 12 additions & 3 deletions src/storage/transaction/TransactionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class TransactionManager {
friend class TransactionManagerTester;
using LockGuard = MemoryLockGuard<std::string>;
using LockCore = MemoryLockCore<std::string>;
using UPtrLock = std::unique_ptr<LockCore>;
using SPtrLock = std::shared_ptr<LockCore>;

public:
Expand Down Expand Up @@ -130,12 +129,20 @@ class TransactionManager {
// this is a callback register to Part::onLostLeadership
void onLeaderLostWrapper(const ::nebula::kvstore::Part::CallbackOptions& options);

void waitUntil(std::function<bool()>&& cond, folly::Promise<folly::Unit>&& p);

protected:
using SpacePart = std::pair<GraphSpaceID, PartitionID>;

StorageEnv* env_{nullptr};
std::shared_ptr<folly::IOThreadPoolExecutor> exec_;
// real executor to run recover / normal jobs
std::shared_ptr<folly::IOThreadPoolExecutor> worker_;
// only used for waiting some job stop.
std::shared_ptr<folly::IOThreadPoolExecutor> controller_;

/**
* @brief used for a remote processor to record the term of its "local Processor"
*/
folly::ConcurrentHashMap<SpacePart, TermID> cachedTerms_;

using MemLockKey = std::tuple<GraphSpaceID, PartitionID, TermID>;
Expand All @@ -145,7 +152,9 @@ class TransactionManager {
* @brief every raft part need to do a scan,
* only scanned part allowed to insert edges
*/
folly::ConcurrentHashMap<std::pair<GraphSpaceID, PartitionID>, TermID> currTerm_;
folly::ConcurrentHashMap<SpacePart, TermID> currTerm_;

folly::ConcurrentHashMap<SpacePart, TermID> prevTerms_;
};

} // namespace storage
Expand Down

0 comments on commit 9dbe5b0

Please sign in to comment.