From 3ede2df9fbaf4f524876d954c3f0b60b456c6ad4 Mon Sep 17 00:00:00 2001 From: "lionel.liu@vesoft.com" <52276794+liuyu85cn@users.noreply.github.com> Date: Mon, 18 Oct 2021 14:03:38 +0800 Subject: [PATCH] Accumulate toss bug fix during test. (#3091) * add some debug info * accumulate bug fix for TOSS Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> --- src/clients/storage/InternalStorageClient.cpp | 2 +- src/kvstore/NebulaStore.cpp | 13 ++++ src/kvstore/NebulaStore.h | 5 +- src/kvstore/Part.cpp | 21 +++++- src/kvstore/Part.h | 9 ++- src/kvstore/raftex/RaftPart.cpp | 4 ++ .../ChainAddEdgesProcessorLocal.cpp | 67 +++++++++++++++---- .../transaction/ChainAddEdgesProcessorLocal.h | 3 +- .../ChainAddEdgesProcessorRemote.cpp | 20 ++++-- .../ChainAddEdgesProcessorRemote.h | 3 + .../transaction/ChainResumeProcessor.cpp | 13 ++-- src/storage/transaction/ConsistUtil.cpp | 7 +- src/storage/transaction/ConsistUtil.h | 2 +- .../transaction/ResumeAddEdgeProcessor.cpp | 8 +-- .../ResumeAddEdgeRemoteProcessor.cpp | 3 +- .../transaction/TransactionManager.cpp | 51 ++++++++++---- src/storage/transaction/TransactionManager.h | 2 + 17 files changed, 179 insertions(+), 54 deletions(-) diff --git a/src/clients/storage/InternalStorageClient.cpp b/src/clients/storage/InternalStorageClient.cpp index 00b6d01ffab..00ef000c4fc 100644 --- a/src/clients/storage/InternalStorageClient.cpp +++ b/src/clients/storage/InternalStorageClient.cpp @@ -95,7 +95,7 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq, } HostAddr& leader = optLeader.value(); leader.port += kInternalPortOffset; - VLOG(1) << "leader host: " << leader; + VLOG(2) << "leader host: " << leader; cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion); auto resp = getResponse( diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 5ac276adebd..250e263cb22 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -1192,5 +1192,18 @@ ErrorOr NebulaStore::getProperty( return folly::toJson(obj); } +void NebulaStore::registerOnNewPartAdded( + const std::string& funcName, + std::function&)> func, + std::vector>& existParts) { + for (auto& item : spaces_) { + for (auto& partItem : item.second->parts_) { + existParts.emplace_back(std::make_pair(item.first, partItem.first)); + func(partItem.second); + } + } + onNewPartAdded_.insert(std::make_pair(funcName, func)); +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index c888a22618c..0b1715015ca 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -279,9 +279,8 @@ class NebulaStore : public KVStore, public Handler { ErrorOr getProperty(GraphSpaceID spaceId, const std::string& property) override; void registerOnNewPartAdded(const std::string& funcName, - std::function&)> func) { - onNewPartAdded_.insert(std::make_pair(funcName, func)); - } + std::function&)> func, + std::vector>& existParts); void unregisterOnNewPartAdded(const std::string& funcName) { onNewPartAdded_.erase(funcName); } diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 2fc49382be6..edee5511b92 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -172,7 +172,18 @@ void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) { void Part::setBlocking(bool sign) { blocking_ = sign; } -void Part::onLostLeadership(TermID term) { VLOG(1) << "Lost the leadership for the term " << term; } +void Part::onLostLeadership(TermID term) { + VLOG(1) << "Lost the leadership for the term " << term; + + CallbackOptions opt; + opt.spaceId = spaceId_; + opt.partId = partId_; + opt.term = term_; + + for (auto& cb : leaderLostCB_) { + cb(opt); + } +} void Part::onElected(TermID term) { VLOG(1) << "Being elected as the leader for the term: " << term; @@ -191,7 +202,9 @@ void Part::onLeaderReady(TermID term) { } } -void Part::registerOnLeaderReady(LeaderReadyCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); } +void Part::registerOnLeaderReady(LeaderChagneCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); } + +void Part::registerOnLeaderLost(LeaderChagneCB cb) { leaderLostCB_.emplace_back(std::move(cb)); } void Part::onDiscoverNewLeader(HostAddr nLeader) { LOG(INFO) << idStr_ << "Find the new leader " << nLeader; @@ -231,6 +244,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr iter, bool wait) { // Make the number of values are an even number DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2); for (size_t i = 0; i < kvs.size(); i += 2) { + VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i]) + << ", val = " << folly::hexlify(kvs[i + 1]); auto code = batch->put(kvs[i], kvs[i + 1]); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()"; @@ -272,6 +287,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr iter, bool wait) { case OP_BATCH_WRITE: { auto data = decodeBatchValue(log); for (auto& op : data) { + VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first) + << ", val=" << folly::hexlify(op.second.second); auto code = nebula::cpp2::ErrorCode::SUCCEEDED; if (op.first == BatchLogType::OP_BATCH_PUT) { code = batch->put(op.second.first, op.second.second); diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index 7b03ae79e9a..b7ca0e801dc 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -116,15 +116,18 @@ class Part : public raftex::RaftPart { TermID term; }; - using LeaderReadyCB = std::function; - void registerOnLeaderReady(LeaderReadyCB cb); + using LeaderChagneCB = std::function; + void registerOnLeaderReady(LeaderChagneCB cb); + + void registerOnLeaderLost(LeaderChagneCB cb); protected: GraphSpaceID spaceId_; PartitionID partId_; std::string walPath_; NewLeaderCallback newLeaderCb_ = nullptr; - std::vector leaderReadyCB_; + std::vector leaderReadyCB_; + std::vector leaderLostCB_; private: KVEngine* engine_ = nullptr; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index bdb289363d1..5228d6dbd7a 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -19,6 +19,7 @@ #include "common/thrift/ThriftClientManager.h" #include "common/time/WallClock.h" #include "interface/gen-cpp2/RaftexServiceAsyncClient.h" +#include "kvstore/LogEncoder.h" #include "kvstore/raftex/Host.h" #include "kvstore/raftex/LogStrListIterator.h" #include "kvstore/wal/FileBasedWal.h" @@ -1335,6 +1336,9 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, << " i did not commit when i was leader, rollback to " << lastLogId_; wal_->rollbackToLog(lastLogId_); } + if (role_ == Role::LEADER) { + bgWorkers_->addTask([self = shared_from_this(), term] { self->onLostLeadership(term); }); + } role_ = Role::FOLLOWER; votedAddr_ = candidate; proposedTerm_ = req.get_term(); diff --git a/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp b/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp index 8ac4d12994a..9c3cef073e8 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp +++ b/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp @@ -47,19 +47,26 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::prepareLocal() { auto [pro, fut] = folly::makePromiseContract(); auto primes = makePrime(); + std::vector debugPrimes; if (FLAGS_trace_toss) { - for (auto& kv : primes) { - VLOG(1) << uuid_ << " put prime " << folly::hexlify(kv.first); - } + debugPrimes = primes; } erasePrime(); env_->kvstore_->asyncMultiPut( - spaceId_, localPartId_, std::move(primes), [p = std::move(pro), this](auto rc) mutable { + spaceId_, + localPartId_, + std::move(primes), + [p = std::move(pro), debugPrimes, this](auto rc) mutable { if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { primeInserted_ = true; + if (FLAGS_trace_toss) { + for (auto& kv : debugPrimes) { + VLOG(1) << uuid_ << " put prime " << folly::hexlify(kv.first); + } + } } else { - LOG(WARNING) << "kvstore err: " << apache::thrift::util::enumNameSafe(rc); + LOG(WARNING) << uuid_ << "kvstore err: " << apache::thrift::util::enumNameSafe(rc); } p.setValue(rc); @@ -85,10 +92,14 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::processLocal(Code code) { VLOG(1) << uuid_ << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code); } + bool remoteFailed{true}; + if (code == Code::SUCCEEDED) { // do nothing + remoteFailed = false; } else if (code == Code::E_RPC_FAILURE) { code_ = Code::SUCCEEDED; + remoteFailed = false; } else { code_ = code; } @@ -106,7 +117,7 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::processLocal(Code code) { if (code_ == Code::SUCCEEDED) { return forwardToDelegateProcessor(); } else { - if (primeInserted_) { + if (primeInserted_ && remoteFailed) { return abort(); } } @@ -142,7 +153,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re pushResultCode(nebula::error(part), localPartId_); return false; } - localTerm_ = (nebula::value(part))->termId(); + restrictTerm_ = (nebula::value(part))->termId(); auto vidLen = env_->schemaMan_->getSpaceVidLen(spaceId_); if (!vidLen.ok()) { @@ -164,7 +175,13 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::forwardToDelegateProcessor( auto [pro, fut] = folly::makePromiseContract(); std::move(futProc).thenValue([&, p = std::move(pro)](auto&& resp) mutable { auto rc = extractRpcError(resp); - if (rc != Code::SUCCEEDED) { + if (rc == Code::SUCCEEDED) { + if (FLAGS_trace_toss) { + for (auto& k : kvErased_) { + VLOG(1) << uuid_ << " erase prime " << folly::hexlify(k); + } + } + } else { VLOG(1) << uuid_ << " forwardToDelegateProcessor(), code = " << apache::thrift::util::enumNameSafe(rc); addUnfinishedEdge(ResumeType::RESUME_CHAIN); @@ -194,7 +211,7 @@ void ChainAddEdgesProcessorLocal::doRpc(folly::Promise&& promise, auto* iClient = env_->txnMan_->getInternalClient(); folly::Promise p; auto f = p.getFuture(); - iClient->chainAddEdges(req, localTerm_, edgeVer_, std::move(p)); + iClient->chainAddEdges(req, restrictTerm_, edgeVer_, std::move(p)); std::move(f).thenTry([=, p = std::move(promise)](auto&& t) mutable { auto code = t.hasValue() ? t.value() : Code::E_RPC_FAILURE; @@ -229,14 +246,26 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::abort() { if (kvErased_.empty()) { return Code::SUCCEEDED; } + + std::vector debugErased; + if (FLAGS_trace_toss) { + debugErased = kvErased_; + } + auto [pro, fut] = folly::makePromiseContract(); env_->kvstore_->asyncMultiRemove( req_.get_space_id(), localPartId_, std::move(kvErased_), - [p = std::move(pro), this](auto rc) mutable { + [p = std::move(pro), debugErased, this](auto rc) mutable { VLOG(1) << uuid_ << " abort()=" << apache::thrift::util::enumNameSafe(rc); - if (rc != Code::SUCCEEDED) { + if (rc == Code::SUCCEEDED) { + if (FLAGS_trace_toss) { + for (auto& k : debugErased) { + VLOG(1) << uuid_ << "erase prime " << folly::hexlify(k); + } + } + } else { addUnfinishedEdge(ResumeType::RESUME_CHAIN); } p.setValue(rc); @@ -313,9 +342,19 @@ bool ChainAddEdgesProcessorLocal::lockEdges(const cpp2::AddEdgesRequest& req) { bool ChainAddEdgesProcessorLocal::checkTerm(const cpp2::AddEdgesRequest& req) { auto space = req.get_space_id(); auto partId = req.get_parts().begin()->first; - auto ret = env_->txnMan_->checkTerm(space, partId, localTerm_); - LOG_IF(WARNING, !ret) << "check term failed, localTerm_ = " << localTerm_; - return ret; + + auto part = env_->kvstore_->part(space, partId); + if (!nebula::ok(part)) { + pushResultCode(nebula::error(part), localPartId_); + return false; + } + auto curTerm = (nebula::value(part))->termId(); + if (restrictTerm_ != curTerm) { + VLOG(1) << folly::sformat( + "check term failed, restrictTerm_={}, currTerm={}", restrictTerm_, curTerm); + return false; + } + return true; } // check if current edge is not newer than the one trying to resume. diff --git a/src/storage/transaction/ChainAddEdgesProcessorLocal.h b/src/storage/transaction/ChainAddEdgesProcessorLocal.h index 4fe6d5d81b2..56dc2873972 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorLocal.h +++ b/src/storage/transaction/ChainAddEdgesProcessorLocal.h @@ -133,7 +133,8 @@ class ChainAddEdgesProcessorLocal : public BaseProcessor, cpp2::AddEdgesRequest req_; std::unique_ptr lk_{nullptr}; int retryLimit_{10}; - TermID localTerm_{-1}; + // need to restrict all the phase in the same term. + TermID restrictTerm_{-1}; // set to true when prime insert succeed // in processLocal(), we check this to determine if need to do abort() bool primeInserted_{false}; diff --git a/src/storage/transaction/ChainAddEdgesProcessorRemote.cpp b/src/storage/transaction/ChainAddEdgesProcessorRemote.cpp index ee71ad3a569..4df94de42e9 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorRemote.cpp +++ b/src/storage/transaction/ChainAddEdgesProcessorRemote.cpp @@ -14,12 +14,16 @@ namespace nebula { namespace storage { void ChainAddEdgesProcessorRemote::process(const cpp2::ChainAddEdgesRequest& req) { - VLOG(1) << this << ConsistUtil::dumpParts(req.get_parts()); + if (FLAGS_trace_toss) { + uuid_ = ConsistUtil::strUUID(); + } + VLOG(1) << uuid_ << ConsistUtil::dumpParts(req.get_parts()); auto partId = req.get_parts().begin()->first; auto code = nebula::cpp2::ErrorCode::SUCCEEDED; do { if (!checkTerm(req)) { - LOG(WARNING) << "invalid term, incoming part " << partId << ", term = " << req.get_term(); + LOG(WARNING) << uuid_ << " invalid term, incoming part " << partId + << ", term = " << req.get_term(); code = nebula::cpp2::ErrorCode::E_OUTDATED_TERM; break; } @@ -35,6 +39,13 @@ void ChainAddEdgesProcessorRemote::process(const cpp2::ChainAddEdgesRequest& req } while (0); if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { + if (FLAGS_trace_toss) { + // need to do this after set spaceVidLen_ + auto keys = getStrEdgeKeys(req); + for (auto& key : keys) { + LOG(INFO) << uuid_ << ", key = " << folly::hexlify(key); + } + } forwardRequest(req); } else { pushResultCode(code, partId); @@ -53,13 +64,14 @@ void ChainAddEdgesProcessorRemote::forwardRequest(const cpp2::ChainAddEdgesReque proc->getFuture().thenValue([=](auto&& resp) { Code rc = Code::SUCCEEDED; for (auto& part : resp.get_result().get_failed_parts()) { + rc = part.code; handleErrorCode(part.code, spaceId, part.get_part_id()); } - VLOG(1) << this << " " << apache::thrift::util::enumNameSafe(rc); + VLOG(1) << uuid_ << " " << apache::thrift::util::enumNameSafe(rc); this->result_ = resp.get_result(); this->onFinished(); }); - proc->process(ConsistUtil::makeDirectAddReq(req)); + proc->process(ConsistUtil::toAddEdgesRequest(req)); } bool ChainAddEdgesProcessorRemote::checkVersion(const cpp2::ChainAddEdgesRequest& req) { diff --git a/src/storage/transaction/ChainAddEdgesProcessorRemote.h b/src/storage/transaction/ChainAddEdgesProcessorRemote.h index 19b795b71d4..8718e7cd5ca 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorRemote.h +++ b/src/storage/transaction/ChainAddEdgesProcessorRemote.h @@ -30,6 +30,9 @@ class ChainAddEdgesProcessorRemote : public BaseProcessor { void forwardRequest(const cpp2::ChainAddEdgesRequest& req); std::vector getStrEdgeKeys(const cpp2::ChainAddEdgesRequest& req); + + private: + std::string uuid_; // for debug purpose }; } // namespace storage diff --git a/src/storage/transaction/ChainResumeProcessor.cpp b/src/storage/transaction/ChainResumeProcessor.cpp index da4812709df..ede78d95eab 100644 --- a/src/storage/transaction/ChainResumeProcessor.cpp +++ b/src/storage/transaction/ChainResumeProcessor.cpp @@ -23,18 +23,23 @@ void ChainResumeProcessor::process() { auto edgeKey = std::string(it->first.c_str() + sizeof(GraphSpaceID), it->first.size() - sizeof(GraphSpaceID)); auto partId = NebulaKeyUtils::getPart(edgeKey); - VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId - << ", hex=" << folly::hexlify(edgeKey); auto prefix = (it->second == ResumeType::RESUME_CHAIN) ? ConsistUtil::primeTable() : ConsistUtil::doublePrimeTable(); auto key = prefix + edgeKey; std::string val; auto rc = env_->kvstore_->get(spaceId, partId, key, &val); + VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId + << ", hex = " << folly::hexlify(edgeKey) + << ", rc = " << apache::thrift::util::enumNameSafe(rc); if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { // do nothing } else if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - // not leader any more, stop trying resume - env_->txnMan_->delPrime(spaceId, edgeKey); + VLOG(1) << "kvstore->get() leader changed"; + auto getPart = env_->kvstore_->part(spaceId, partId); + if (nebula::ok(getPart) && !nebula::value(getPart)->isLeader()) { + // not leader any more, stop trying resume + env_->txnMan_->delPrime(spaceId, edgeKey); + } continue; } else { LOG(WARNING) << "kvstore->get() failed, " << apache::thrift::util::enumNameSafe(rc); diff --git a/src/storage/transaction/ConsistUtil.cpp b/src/storage/transaction/ConsistUtil.cpp index 84f3812433b..7d0c2865d93 100644 --- a/src/storage/transaction/ConsistUtil.cpp +++ b/src/storage/transaction/ConsistUtil.cpp @@ -132,7 +132,7 @@ int64_t ConsistUtil::getTimestamp(const std::string& val) noexcept { return *reinterpret_cast(val.data() + (val.size() - sizeof(int64_t))); } -cpp2::AddEdgesRequest ConsistUtil::makeDirectAddReq(const cpp2::ChainAddEdgesRequest& req) { +cpp2::AddEdgesRequest ConsistUtil::toAddEdgesRequest(const cpp2::ChainAddEdgesRequest& req) { cpp2::AddEdgesRequest ret; ret.set_space_id(req.get_space_id()); ret.set_parts(req.get_parts()); @@ -177,6 +177,11 @@ std::pair ConsistUtil::versionOfUpdateReq( std::string ConsistUtil::dumpAddEdgeReq(const cpp2::AddEdgesRequest& req) { std::stringstream oss; + oss << "prop_names.size() = " << req.get_prop_names().size() << " "; + for (auto& name : req.get_prop_names()) { + oss << name << " "; + } + oss << " "; for (auto& part : req.get_parts()) { // oss << dumpParts(part.second); for (auto& edge : part.second) { diff --git a/src/storage/transaction/ConsistUtil.h b/src/storage/transaction/ConsistUtil.h index 814885b6ddd..27c687011e3 100644 --- a/src/storage/transaction/ConsistUtil.h +++ b/src/storage/transaction/ConsistUtil.h @@ -91,7 +91,7 @@ class ConsistUtil final { static int64_t getTimestamp(const std::string& val) noexcept; - static cpp2::AddEdgesRequest makeDirectAddReq(const cpp2::ChainAddEdgesRequest& req); + static cpp2::AddEdgesRequest toAddEdgesRequest(const cpp2::ChainAddEdgesRequest& req); static cpp2::EdgeKey reverseEdgeKey(const cpp2::EdgeKey& edgeKey); diff --git a/src/storage/transaction/ResumeAddEdgeProcessor.cpp b/src/storage/transaction/ResumeAddEdgeProcessor.cpp index a005be49627..1e0d9a931a5 100644 --- a/src/storage/transaction/ResumeAddEdgeProcessor.cpp +++ b/src/storage/transaction/ResumeAddEdgeProcessor.cpp @@ -53,11 +53,6 @@ folly::SemiFuture ResumeAddEdgeProcessor::processLocal(Code code) { return Code::E_OUTDATED_TERM; } - if (!checkVersion(req_)) { - LOG(WARNING) << this << "E_OUTDATED_EDGE"; - return Code::E_OUTDATED_EDGE; - } - if (code == Code::E_RPC_FAILURE) { kvAppend_ = ChainAddEdgesProcessorLocal::makeDoublePrime(); } @@ -66,7 +61,8 @@ folly::SemiFuture ResumeAddEdgeProcessor::processLocal(Code code) { // if there are something wrong other than rpc failure // we need to keep the resume retry(by not remove those prime key) erasePrime(); - return ChainAddEdgesProcessorLocal::forwardToDelegateProcessor(); + code_ = forwardToDelegateProcessor().get(); + return code_; } return code; diff --git a/src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp b/src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp index 38e18431c78..b706d687237 100644 --- a/src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp +++ b/src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp @@ -72,7 +72,8 @@ folly::SemiFuture ResumeAddEdgeRemoteProcessor::processLocal(Code code) { // if there are something wrong other than rpc failure // we need to keep the resume retry(by not remove those prime key) ChainAddEdgesProcessorLocal::eraseDoublePrime(); - return forwardToDelegateProcessor(); + code_ = forwardToDelegateProcessor().get(); + return code_; } return code; diff --git a/src/storage/transaction/TransactionManager.cpp b/src/storage/transaction/TransactionManager.cpp index a1c716df319..d23ab910df3 100644 --- a/src/storage/transaction/TransactionManager.cpp +++ b/src/storage/transaction/TransactionManager.cpp @@ -23,13 +23,17 @@ DEFINE_int32(resume_interval_secs, 10, "Resume interval"); ProcessorCounters kForwardTranxCounters; TransactionManager::TransactionManager(StorageEnv* env) : env_(env) { + LOG(INFO) << "TransactionManager ctor()"; exec_ = std::make_shared(10); iClient_ = env_->interClient_; resumeThread_ = std::make_unique(); - scanAll(); + std::vector> existParts; auto fn = std::bind(&TransactionManager::onNewPartAdded, this, std::placeholders::_1); static_cast<::nebula::kvstore::NebulaStore*>(env_->kvstore_) - ->registerOnNewPartAdded("TransactionManager", fn); + ->registerOnNewPartAdded("TransactionManager", fn, existParts); + for (auto& partOfSpace : existParts) { + scanPrimes(partOfSpace.first, partOfSpace.second); + } } TransactionManager::LockCore* TransactionManager::getLockCore(GraphSpaceID spaceId, @@ -37,10 +41,7 @@ TransactionManager::LockCore* TransactionManager::getLockCore(GraphSpaceID space bool checkWhiteList) { if (checkWhiteList) { if (whiteListParts_.find(std::make_pair(spaceId, partId)) == whiteListParts_.end()) { - LOG(WARNING) << folly::sformat("space {}, part {} not in white list", spaceId, partId); - scanPrimes(spaceId, partId); - auto key = std::make_pair(spaceId, partId); - whiteListParts_.insert(std::make_pair(key, 0)); + return nullptr; } } auto it = memLocks_.find(spaceId); @@ -61,8 +62,8 @@ bool TransactionManager::checkTerm(GraphSpaceID spaceId, PartitionID partId, Ter if (termOfMeta.ok()) { if (term < termOfMeta.value()) { LOG(WARNING) << "checkTerm() failed: " - << "spaceId=" << spaceId << ", partId=" << partId << ", expect term=" << term - << ", actual term=" << termOfMeta.value(); + << "spaceId=" << spaceId << ", partId=" << partId << ", in-coming term=" << term + << ", term in meta cache=" << termOfMeta.value(); return false; } } @@ -151,13 +152,26 @@ void TransactionManager::scanAll() { void TransactionManager::onNewPartAdded(std::shared_ptr& part) { LOG(INFO) << folly::sformat("space={}, part={} added", part->spaceId(), part->partitionId()); - auto fn = std::bind(&TransactionManager::onLeaderElectedWrapper, this, std::placeholders::_1); - part->registerOnLeaderReady(fn); + auto fnLeaderReady = + std::bind(&TransactionManager::onLeaderElectedWrapper, this, std::placeholders::_1); + auto fnLeaderLost = + std::bind(&TransactionManager::onLeaderLostWrapper, this, std::placeholders::_1); + part->registerOnLeaderReady(fnLeaderReady); + part->registerOnLeaderLost(fnLeaderLost); +} + +void TransactionManager::onLeaderLostWrapper(const ::nebula::kvstore::Part::CallbackOptions& opt) { + LOG(INFO) << folly::sformat("leader lost, del space={}, part={}, term={} from white list", + opt.spaceId, + opt.partId, + opt.term); + whiteListParts_.erase(std::make_pair(opt.spaceId, opt.partId)); } void TransactionManager::onLeaderElectedWrapper( const ::nebula::kvstore::Part::CallbackOptions& opt) { - LOG(INFO) << folly::sformat("onLeaderElectedWrapper space={}, part={}", opt.spaceId, opt.partId); + LOG(INFO) << folly::sformat( + "leader get do scanPrimes space={}, part={}, term={}", opt.spaceId, opt.partId, opt.term); scanPrimes(opt.spaceId, opt.partId); } @@ -181,6 +195,11 @@ void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId) { LOG(ERROR) << "not supposed to lock fail: " << folly::hexlify(edgeKey); } } + } else { + VLOG(1) << "primePrefix() " << apache::thrift::util::enumNameSafe(rc); + if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { + return; + } } prefix = ConsistUtil::doublePrimePrefix(partId); rc = env_->kvstore_->prefix(spaceId, partId, prefix, &iter); @@ -192,16 +211,22 @@ void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId) { if (!insSucceed.second) { LOG(ERROR) << "not supposed to insert fail: " << folly::hexlify(edgeKey); } - auto* lk = getLockCore(spaceId, partId); + auto* lk = getLockCore(spaceId, partId, false); auto succeed = lk->try_lock(edgeKey.str()); if (!succeed) { LOG(ERROR) << "not supposed to lock fail: " << folly::hexlify(edgeKey); } } + } else { + VLOG(1) << "doublePrimePrefix() " << apache::thrift::util::enumNameSafe(rc); + if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { + return; + } } auto partOfSpace = std::make_pair(spaceId, partId); auto insRet = whiteListParts_.insert(std::make_pair(partOfSpace, 0)); - LOG(ERROR) << "insert space=" << spaceId << ", part=" << partId << ", suc=" << insRet.second; + LOG(INFO) << "insert space=" << spaceId << ", part=" << partId + << ", into white list suc=" << insRet.second; } folly::ConcurrentHashMap* TransactionManager::getReserveTable() { diff --git a/src/storage/transaction/TransactionManager.h b/src/storage/transaction/TransactionManager.h index 76d5a35826f..7e8ff8f53db 100644 --- a/src/storage/transaction/TransactionManager.h +++ b/src/storage/transaction/TransactionManager.h @@ -90,6 +90,8 @@ class TransactionManager { // this is a callback register to Part::onElected void onLeaderElectedWrapper(const ::nebula::kvstore::Part::CallbackOptions& options); + void onLeaderLostWrapper(const ::nebula::kvstore::Part::CallbackOptions& options); + protected: using PartUUID = std::pair;