From 87ef2a278c98814b520d6f8f57452b5154cb9f3e Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Wed, 6 Apr 2022 12:54:12 +0800 Subject: [PATCH 1/5] fix apply outdate membership change (#4107) * fix apply outdate membership change * fix transfer leader to '':0 cause crash Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- src/kvstore/LogEncoder.cpp | 4 +- src/kvstore/Part.cpp | 73 ++++++++++++++++++----- src/meta/processors/admin/AdminClient.cpp | 4 ++ 3 files changed, 63 insertions(+), 18 deletions(-) diff --git a/src/kvstore/LogEncoder.cpp b/src/kvstore/LogEncoder.cpp index db5adfa08df..170bcaad02d 100644 --- a/src/kvstore/LogEncoder.cpp +++ b/src/kvstore/LogEncoder.cpp @@ -217,8 +217,6 @@ decodeBatchValue(folly::StringPiece encoded) { std::string encodeHost(LogType type, const HostAddr& host) { std::string encoded; - // 15 refers to "255.255.255.255" - encoded.reserve(sizeof(int64_t) + 1 + 15 + sizeof(int)); int64_t ts = time::WallClock::fastNowInMilliSec(); std::string encodedHost; apache::thrift::CompactSerializer::serialize(host, &encodedHost); @@ -232,7 +230,7 @@ std::string encodeHost(LogType type, const HostAddr& host) { HostAddr decodeHost(LogType type, const folly::StringPiece& encoded) { HostAddr addr; - CHECK_GE(encoded.size(), sizeof(int64_t) + 1 + sizeof(size_t) + sizeof(Port)); + CHECK_GE(encoded.size(), sizeof(int64_t) + 1); CHECK(encoded[sizeof(int64_t)] == type); folly::StringPiece raw = encoded; diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 6c3a10cab85..92fe487b6c8 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -212,6 +212,8 @@ void Part::onDiscoverNewLeader(HostAddr nLeader) { std::tuple Part::commitLogs( std::unique_ptr iter, bool wait, bool needLock) { + // We should apply any membership change which happens before start time. Because when we start + // up, the peers comes from meta, has already contains all previous changes. SCOPED_TIMER(&execTime_); auto batch = engine_->startBatchWrite(); LogID lastId = kNoCommitLogId; @@ -309,12 +311,26 @@ std::tuple Part::commitLogs( } case OP_TRANS_LEADER: { auto newLeader = decodeHost(OP_TRANS_LEADER, log); - commitTransLeader(newLeader, needLock); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + commitTransLeader(newLeader, needLock); + } else { + VLOG(2) << idStr_ << "Skip commit stale transfer leader " << newLeader + << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " + << ts; + } break; } case OP_REMOVE_PEER: { auto peer = decodeHost(OP_REMOVE_PEER, log); - commitRemovePeer(peer, needLock); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + commitRemovePeer(peer, needLock); + } else { + VLOG(2) << idStr_ << "Skip commit stale remove peer " << peer + << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " + << ts; + } break; } default: { @@ -389,36 +405,63 @@ nebula::cpp2::ErrorCode Part::putCommitMsg(WriteBatch* batch, } bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const std::string& log) { + // We should apply any membership change which happens before start time. Because when we start + // up, the peers comes from meta, has already contains all previous changes. VLOG(4) << idStr_ << "logId " << logId << ", termId " << termId << ", clusterId " << clusterId; if (!log.empty()) { switch (log[sizeof(int64_t)]) { case OP_ADD_LEARNER: { auto learner = decodeHost(OP_ADD_LEARNER, log); - LOG(INFO) << idStr_ << "preprocess add learner " << learner; - addLearner(learner, false); - // persist the part learner info in case of storaged restarting - engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner)); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + VLOG(1) << idStr_ << "preprocess add learner " << learner; + addLearner(learner, false); + // persist the part learner info in case of storaged restarting + engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner)); + } else { + VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at " + << startTimeMs_ << ", but the log timestamp is " << ts; + } break; } case OP_TRANS_LEADER: { auto newLeader = decodeHost(OP_TRANS_LEADER, log); - LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader; - preProcessTransLeader(newLeader); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + VLOG(1) << idStr_ << "preprocess trans leader " << newLeader; + preProcessTransLeader(newLeader); + } else { + VLOG(1) << idStr_ << "Skip stale transfer leader " << newLeader + << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " + << ts; + } break; } case OP_ADD_PEER: { auto peer = decodeHost(OP_ADD_PEER, log); - LOG(INFO) << idStr_ << "preprocess add peer " << peer; - addPeer(peer); - engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer)); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + VLOG(1) << idStr_ << "preprocess add peer " << peer; + addPeer(peer); + engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer)); + } else { + VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at " + << startTimeMs_ << ", but the log timestamp is " << ts; + } break; } case OP_REMOVE_PEER: { auto peer = decodeHost(OP_REMOVE_PEER, log); - LOG(INFO) << idStr_ << "preprocess remove peer " << peer; - preProcessRemovePeer(peer); - // remove peer in the persist info - engine_->updatePart(partId_, Peer(peer, Peer::Status::kDeleted)); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + VLOG(1) << idStr_ << "preprocess remove peer " << peer; + preProcessRemovePeer(peer); + // remove peer in the persist info + engine_->updatePart(partId_, Peer(peer, Peer::Status::kDeleted)); + } else { + VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at " + << startTimeMs_ << ", but the log timestamp is " << ts; + } break; } default: { diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index dbcfdd90a2d..d9e493df81f 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -43,6 +43,7 @@ folly::Future AdminClient::transLeader(GraphSpaceID spaceId, } auto target = dst; if (dst == kRandomPeer) { + // pick a alive host as target when dst not specified for (auto& p : peers) { if (p != src) { auto retCode = ActiveHostsMan::isLived(kv_, p); @@ -53,6 +54,9 @@ folly::Future AdminClient::transLeader(GraphSpaceID spaceId, } } } + if (target == kRandomPeer) { + return Status::Error("No active peers found"); + } req.new_leader_ref() = std::move(target); return getResponseFromPart( Utils::getAdminAddrFromStoreAddr(src), From 1cdddbd4ac9fac8f66e74ad6c18a46c70077fd83 Mon Sep 17 00:00:00 2001 From: "pengwei.song" <90180021+pengweisong@users.noreply.github.com> Date: Wed, 6 Apr 2022 13:42:40 +0800 Subject: [PATCH 2/5] add code for part peers backward compatible (#4101) * add code for part peers backward compatible * add balance keys * fix bug: pass tests * add gflag && ignore * change int to int64_t * chage to emplace Co-authored-by: panda-sheep <59197347+panda-sheep@users.noreply.github.com> Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- src/common/utils/NebulaKeyUtils.cpp | 11 ++++ src/common/utils/NebulaKeyUtils.h | 15 +++++ src/common/utils/Types.h | 1 + src/kvstore/Common.h | 77 ++++++++++++++++++------ src/kvstore/KVEngine.h | 9 ++- src/kvstore/NebulaStore.cpp | 88 +++++++++++++++++++++------- src/kvstore/RocksEngine.cpp | 29 ++++++--- src/kvstore/RocksEngine.h | 18 ++++-- src/kvstore/test/NebulaStoreTest.cpp | 5 +- 9 files changed, 194 insertions(+), 59 deletions(-) diff --git a/src/common/utils/NebulaKeyUtils.cpp b/src/common/utils/NebulaKeyUtils.cpp index 31b24fc57ba..d026f3efed0 100644 --- a/src/common/utils/NebulaKeyUtils.cpp +++ b/src/common/utils/NebulaKeyUtils.cpp @@ -119,6 +119,17 @@ std::string NebulaKeyUtils::systemPartKey(PartitionID partId) { return key; } +// static +std::string NebulaKeyUtils::systemBalanceKey(PartitionID partId) { + uint32_t item = (partId << kPartitionOffset) | static_cast(NebulaKeyType::kSystem); + uint32_t type = static_cast(NebulaSystemKeyType::kSystemBalance); + std::string key; + key.reserve(kSystemLen); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(reinterpret_cast(&type), sizeof(NebulaSystemKeyType)); + return key; +} + // static std::string NebulaKeyUtils::kvKey(PartitionID partId, const folly::StringPiece& name) { std::string key; diff --git a/src/common/utils/NebulaKeyUtils.h b/src/common/utils/NebulaKeyUtils.h index 17cbd88bccc..6608c4d5bdf 100644 --- a/src/common/utils/NebulaKeyUtils.h +++ b/src/common/utils/NebulaKeyUtils.h @@ -77,6 +77,8 @@ class NebulaKeyUtils final { static std::string systemPartKey(PartitionID partId); + static std::string systemBalanceKey(PartitionID partId); + static std::string kvKey(PartitionID partId, const folly::StringPiece& name); static std::string kvPrefix(PartitionID partId); @@ -189,6 +191,19 @@ class NebulaKeyUtils final { return static_cast(type) == NebulaSystemKeyType::kSystemPart; } + static bool isSystemBalance(const folly::StringPiece& rawKey) { + if (rawKey.size() != kSystemLen) { + return false; + } + if (!isSystem(rawKey)) { + return false; + } + auto position = rawKey.data() + sizeof(PartitionID); + auto len = sizeof(NebulaSystemKeyType); + auto type = readInt(position, len); + return static_cast(type) == NebulaSystemKeyType::kSystemBalance; + } + static VertexIDSlice getSrcId(size_t vIdLen, const folly::StringPiece& rawKey) { if (rawKey.size() < kEdgeLen + (vIdLen << 1)) { dumpBadKey(rawKey, kEdgeLen + (vIdLen << 1), vIdLen); diff --git a/src/common/utils/Types.h b/src/common/utils/Types.h index e365fb4270e..c5a812b9a16 100644 --- a/src/common/utils/Types.h +++ b/src/common/utils/Types.h @@ -26,6 +26,7 @@ enum class NebulaKeyType : uint32_t { enum class NebulaSystemKeyType : uint32_t { kSystemCommit = 0x00000001, kSystemPart = 0x00000002, + kSystemBalance = 0x00000003, }; enum class NebulaOperationType : uint32_t { diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index e2ee85dd030..c97e6c878a7 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -9,12 +9,17 @@ #include #include +#include + #include "common/base/Base.h" #include "common/datatypes/HostAddr.h" #include "common/thrift/ThriftTypes.h" +#include "common/time/WallClock.h" #include "common/utils/Types.h" #include "interface/gen-cpp2/common_types.h" +DECLARE_int64(balance_expired_sesc); + namespace nebula { namespace kvstore { @@ -59,6 +64,7 @@ struct Peer { Status status; Peer() : addr(), status(Status::kNormalPeer) {} + explicit Peer(HostAddr a) : addr(a), status(Status::kNormalPeer) {} Peer(HostAddr a, Status s) : addr(a), status(s) {} std::string toString() const { @@ -106,26 +112,37 @@ inline std::ostream& operator<<(std::ostream& os, const Peer& peer) { struct Peers { private: std::map peers; + int64_t createdTime; public: - Peers() {} + Peers() { + createdTime = time::WallClock::fastNowInSec(); + } explicit Peers(const std::vector& addrs) { // from normal peers for (auto& addr : addrs) { peers[addr] = Peer(addr, Peer::Status::kNormalPeer); } + createdTime = time::WallClock::fastNowInSec(); } explicit Peers(const std::vector& ps) { for (auto& p : ps) { peers[p.addr] = p; } + createdTime = time::WallClock::fastNowInSec(); + } + explicit Peers(std::map ps) : peers(std::move(ps)) { + createdTime = time::WallClock::fastNowInSec(); } - explicit Peers(std::map ps) : peers(std::move(ps)) {} void addOrUpdate(const Peer& peer) { peers[peer.addr] = peer; } - bool get(const HostAddr& addr, Peer* peer) { + bool exist(const HostAddr& addr) const { + return peers.find(addr) != peers.end(); + } + + bool get(const HostAddr& addr, Peer* peer) const { auto it = peers.find(addr); if (it == peers.end()) { return false; @@ -141,7 +158,7 @@ struct Peers { peers.erase(addr); } - size_t size() { + size_t size() const { return peers.size(); } @@ -149,31 +166,53 @@ struct Peers { return peers; } + bool allNormalPeers() const { + for (const auto& [addr, peer] : peers) { + if (peer.status == Peer::Status::kDeleted) { + continue; + } + + if (peer.status != Peer::Status::kNormalPeer) { + return false; + } + } + return true; + } + + bool isExpired() const { + return time::WallClock::fastNowInSec() - createdTime > FLAGS_balance_expired_sesc; + } + + void setCreatedTime(int64_t time) { + createdTime = time; + } + std::string toString() const { std::stringstream os; os << "version:1," - << "count:" << peers.size() << "\n"; + << "count:" << peers.size() << ",ts:" << createdTime << "\n"; for (const auto& [_, p] : peers) { os << p << "\n"; } return os.str(); } - static std::pair extractHeader(const std::string& header) { - auto pos = header.find(":"); - if (pos == std::string::npos) { + static std::tuple extractHeader(const std::string& header) { + std::vector fields; + folly::split(":", header, fields, true); + if (fields.size() != 4) { LOG(INFO) << "Parse part peers header error:" << header; - return {0, 0}; + return {0, 0, 0}; } - int version = std::stoi(header.substr(pos + 1)); - pos = header.find(":", pos + 1); - if (pos == std::string::npos) { - LOG(INFO) << "Parse part peers header error:" << header; - return {0, 0}; - } - int count = std::stoi(header.substr(pos + 1)); - return {version, count}; + int version = std::stoi(fields[1]); + int count = std::stoi(fields[2]); + + int64_t createdTime; + std::istringstream iss(fields[3]); + iss >> createdTime; + + return {version, count, createdTime}; } static Peers fromString(const std::string& str) { @@ -186,7 +225,7 @@ struct Peers { return peers; } - auto [version, count] = extractHeader(lines[0]); + auto [version, count, createdTime] = extractHeader(lines[0]); if (version != 1) { LOG(INFO) << "Wrong peers format version:" << version; return peers; @@ -198,6 +237,8 @@ struct Peers { return peers; } + peers.setCreatedTime(createdTime); + // skip header for (size_t i = 1; i < lines.size(); ++i) { auto& line = lines[i]; diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 5d48206104b..d3a98d044cc 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -228,11 +228,10 @@ class KVEngine { virtual void addPart(PartitionID partId, const Peers& raftPeers) = 0; /** - * @brief Update part info. Could only update the peers' persist info now. + * @brief Update part info. Could only update the persist peers info in balancing now. * * @param partId - * @param raftPeer 1. if raftPeer.status is kDeleted, delete this peer. - * 2. if raftPeer.status is others, add or update this peer + * @param raftPeer */ virtual void updatePart(PartitionID partId, const Peer& raftPeer) = 0; @@ -251,11 +250,11 @@ class KVEngine { virtual std::vector allParts() = 0; /** - * @brief Return all partId->raft peers that current storage engine holds. + * @brief Return all balancing partId->raft peers that current storage engine holds. * * @return std::map partId-> raft peers for each part, including learners */ - virtual std::map allPartPeers() = 0; + virtual std::map balancePartPeers() = 0; /** * @brief Return total parts num diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index ee4c08b9151..c5766d582bb 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -13,6 +13,8 @@ #include "common/fs/FileUtils.h" #include "common/network/NetworkUtils.h" +#include "common/time/WallClock.h" +#include "common/utils/NebulaKeyUtils.h" #include "kvstore/NebulaSnapshotManager.h" #include "kvstore/RocksEngine.h" @@ -81,7 +83,8 @@ bool NebulaStore::init() { void NebulaStore::loadPartFromDataPath() { CHECK(!!options_.partMan_); LOG(INFO) << "Scan the local path, and init the spaces_"; - std::unordered_set> spacePartIdSet; + // avoid duplicate engine created + std::unordered_set> partSet; for (auto& path : options_.dataPaths_) { auto rootPath = folly::stringPrintf("%s/nebula", path.c_str()); auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str()); @@ -101,35 +104,78 @@ void NebulaStore::loadPartFromDataPath() { continue; } - // Load raft peers info which persisted to local engine. - // If the partition was in balancing process before restart, we should keep it - // though the part is not in the meta. auto engine = newEngine(spaceId, path, options_.walPath_); std::map partRaftPeers; - for (auto& [partId, raftPeers] : engine->allPartPeers()) { - bool isNormalPeer = true; - - Peer raftPeer; - bool exist = raftPeers.get(raftAddr_, &raftPeer); - if (exist) { - if (raftPeer.status != Peer::Status::kNormalPeer) { - isNormalPeer = false; + + // load balancing part info which persisted to local engine. + for (auto& [partId, raftPeers] : engine->balancePartPeers()) { + CHECK_NE(raftPeers.size(), 0); + + if (raftPeers.isExpired()) { + LOG(INFO) << "Space: " << spaceId << ", part:" << partId + << " balancing info expired, ignore it."; + continue; + } + + auto spacePart = std::make_pair(spaceId, partId); + if (partSet.find(spacePart) == partSet.end()) { + partSet.emplace(std::make_pair(spaceId, partId)); + + // join the balancing peers with meta peers + auto metaStatus = options_.partMan_->partMeta(spaceId, partId); + if (!metaStatus.ok()) { + LOG(INFO) << "Space: " << spaceId << "; partId: " << partId + << " does not exist in part manager when join balancing."; + } else { + auto partMeta = metaStatus.value(); + for (auto& h : partMeta.hosts_) { + auto raftAddr = getRaftAddr(h); + if (!raftPeers.exist(raftAddr)) { + VLOG(1) << "Add raft peer " << raftAddr; + raftPeers.addOrUpdate(Peer(raftAddr)); + } + } } + + partRaftPeers.emplace(partId, raftPeers); + } + } + + // load normal part ids which persisted to local engine. + for (auto& partId : engine->allParts()) { + // first priority: balancing + bool inBalancing = partRaftPeers.find(partId) != partRaftPeers.end(); + if (inBalancing) { + continue; } - if (!options_.partMan_->partExist(storeSvcAddr_, spaceId, partId).ok() && isNormalPeer) { - LOG(INFO) << "Part " << partId - << " is a normal peer and does not exist in meta any more, will remove it!"; + // second priority: meta + if (!options_.partMan_->partExist(storeSvcAddr_, spaceId, partId).ok()) { + LOG(INFO) + << "Part " << partId + << " is not in balancing and does not exist in meta any more, will remove it!"; engine->removePart(partId); continue; - } else { - auto spacePart = std::make_pair(spaceId, partId); - if (spacePartIdSet.find(spacePart) == spacePartIdSet.end()) { - spacePartIdSet.emplace(spacePart); - partRaftPeers.emplace(partId, raftPeers); + } + + auto spacePart = std::make_pair(spaceId, partId); + if (partSet.find(spacePart) == partSet.end()) { + partSet.emplace(spacePart); + + // fill the peers + auto metaStatus = options_.partMan_->partMeta(spaceId, partId); + CHECK(metaStatus.ok()); + auto partMeta = metaStatus.value(); + Peers peers; + for (auto& h : partMeta.hosts_) { + VLOG(1) << "Add raft peer " << getRaftAddr(h); + peers.addOrUpdate(Peer(getRaftAddr(h))); } + partRaftPeers.emplace(partId, peers); } } + + // there is no valid part in this engine, remove it if (partRaftPeers.empty()) { engine.reset(); // close engine if (!options_.partMan_->spaceExist(storeSvcAddr_, spaceId).ok()) { @@ -141,7 +187,7 @@ void NebulaStore::loadPartFromDataPath() { continue; } - // add to spaces if the part should exist + // add to spaces KVEngine* enginePtr = nullptr; { folly::RWSpinLock::WriteHolder wh(&lock_); diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index f57ec600b68..7f45f4bb409 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -15,6 +15,9 @@ #include "kvstore/KVStore.h" DEFINE_bool(move_files, false, "Move the SST files instead of copy when ingest into dataset"); +DEFINE_int64(balance_expired_sesc, + 86400, + "The expired time of balancing part info persisted in the storaged"); namespace nebula { namespace kvstore { @@ -320,30 +323,39 @@ std::string RocksEngine::partKey(PartitionID partId) { return NebulaKeyUtils::systemPartKey(partId); } +std::string RocksEngine::balanceKey(PartitionID partId) { + return NebulaKeyUtils::systemBalanceKey(partId); +} + void RocksEngine::addPart(PartitionID partId, const Peers& raftPeers) { - auto ret = put(partKey(partId), raftPeers.toString()); + auto ret = put(partKey(partId), ""); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { partsNum_++; CHECK_GE(partsNum_, 0); } + + if (!raftPeers.allNormalPeers()) { + put(balanceKey(partId), raftPeers.toString()); + } } void RocksEngine::updatePart(PartitionID partId, const Peer& raftPeer) { std::string val; - auto ret = get(partKey(partId), &val); + auto ret = get(balanceKey(partId), &val); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "Update part failed when get, partId=" << partId; return; } auto peers = Peers::fromString(val); - if (raftPeer.status == Peer::Status::kDeleted) { - peers.remove(raftPeer.addr); + peers.addOrUpdate(raftPeer); + + if (peers.allNormalPeers()) { + ret = remove(balanceKey(partId)); } else { - peers.addOrUpdate(raftPeer); + ret = put(balanceKey(partId), peers.toString()); } - ret = put(partKey(partId), peers.toString()); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "Update part failed when put back, partId=" << partId; } @@ -354,6 +366,7 @@ void RocksEngine::removePart(PartitionID partId) { options.disableWAL = FLAGS_rocksdb_disable_wal; std::vector sysKeysToDelete; sysKeysToDelete.emplace_back(partKey(partId)); + sysKeysToDelete.emplace_back(balanceKey(partId)); sysKeysToDelete.emplace_back(NebulaKeyUtils::systemCommitKey(partId)); auto code = multiRemove(sysKeysToDelete); if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -386,7 +399,7 @@ std::vector RocksEngine::allParts() { return parts; } -std::map RocksEngine::allPartPeers() { +std::map RocksEngine::balancePartPeers() { std::unique_ptr iter; std::map partRaftPeers; static const std::string prefixStr = NebulaKeyUtils::systemPrefix(); @@ -397,7 +410,7 @@ std::map RocksEngine::allPartPeers() { while (iter->valid()) { auto key = iter->key(); - if (!NebulaKeyUtils::isSystemPart(key)) { + if (!NebulaKeyUtils::isSystemBalance(key)) { iter->next(); continue; } diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 7384ed218fc..9cc9b057301 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -401,11 +401,11 @@ class RocksEngine : public KVEngine { void addPart(PartitionID partId, const Peers& raftPeers = {}) override; /** - * @brief Update part info. Could only update the persist peers info now. + * @brief Update part info. Could only update the persist peers info in balancing now. * * @param partId - * @param raftPeer 1. if raftPeer.status is kDeleted, delete this peer. - * 2. if raftPeer.status is others, add or update this peer + * @param raftPeer + * */ void updatePart(PartitionID partId, const Peer& raftPeer) override; @@ -424,11 +424,11 @@ class RocksEngine : public KVEngine { std::vector allParts() override; /** - * @brief Retrun all the part->raft peers in rocksdb engine by scanning system part key. + * @brief Retrun all the balancing part->raft peers in rocksdb engine by scanning system part key. * * @return std::map */ - std::map allPartPeers() override; + std::map balancePartPeers() override; /** * @brief Return total partition numbers @@ -528,6 +528,14 @@ class RocksEngine : public KVEngine { */ std::string partKey(PartitionID partId); + /** + * @brief System balance key, containing balancing info + * + * @param partId + * @return std::string + */ + std::string balanceKey(PartitionID partId); + /** * @brief Open the rocksdb backup engine, mainly for rocksdb PlainTable mounted on tmpfs/ramfs * diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index f603e97bd56..c8abdcb7171 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -166,7 +166,7 @@ TEST(NebulaStoreTest, PartsTest) { // disk2: 5, 6, 7, 15 KVOptions options; - options.dataPaths_ = std::move(paths); + options.dataPaths_ = paths; options.partMan_ = std::move(partMan); HostAddr local = {"", 0}; auto store = @@ -291,11 +291,12 @@ TEST(NebulaStoreTest, PersistPeersTest) { // disk2: 5, 6, 7, 15 KVOptions options; - options.dataPaths_ = std::move(paths); + options.dataPaths_ = paths; options.partMan_ = std::move(partMan); auto store = std::make_unique(std::move(options), ioThreadPool, local, getHandlers()); store->init(); + auto check = [&](GraphSpaceID spaceId) { for (int i = 0; i < static_cast(paths.size()); i++) { ASSERT_EQ(folly::stringPrintf("%s/disk%d/nebula/%d", rootPath.path(), i + 1, spaceId), From bc18c23550e65fc1add6bf94841351bb30c5ec11 Mon Sep 17 00:00:00 2001 From: "haifei.zhao" <32253291+zhaohaifei@users.noreply.github.com> Date: Wed, 6 Apr 2022 14:07:30 +0800 Subject: [PATCH 3/5] fixed (#4116) Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- src/meta/processors/admin/VerifyClientVersionProcessor.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/meta/processors/admin/VerifyClientVersionProcessor.cpp b/src/meta/processors/admin/VerifyClientVersionProcessor.cpp index df00c1cdbff..ab400bc4cca 100644 --- a/src/meta/processors/admin/VerifyClientVersionProcessor.cpp +++ b/src/meta/processors/admin/VerifyClientVersionProcessor.cpp @@ -31,8 +31,7 @@ void VerifyClientVersionProcessor::process(const cpp2::VerifyClientVersionReq& r auto versionVal = MetaKeyUtils::versionVal(req.get_build_version().c_str()); std::vector versionData; versionData.emplace_back(std::move(versionKey), std::move(versionVal)); - doSyncPut(versionData); - resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; + handleErrorCode(doSyncPut(versionData)); } onFinished(); } From a96db3014a83656ab6b9c14485d46d6273911639 Mon Sep 17 00:00:00 2001 From: "haifei.zhao" <32253291+zhaohaifei@users.noreply.github.com> Date: Thu, 7 Apr 2022 14:44:21 +0800 Subject: [PATCH 4/5] move KW_CLEAR to unresolved keyword (#4118) --- .linters/cpp/checkKeyword.py | 1 - scripts/nebula.service | 2 +- src/parser/parser.yy | 1 + src/parser/scanner.lex | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.linters/cpp/checkKeyword.py b/.linters/cpp/checkKeyword.py index bbe3dec3238..4055255051d 100755 --- a/.linters/cpp/checkKeyword.py +++ b/.linters/cpp/checkKeyword.py @@ -75,7 +75,6 @@ 'KW_ADD', 'KW_CREATE', 'KW_DROP', - 'KW_CLEAR', 'KW_REMOVE', 'KW_IF', 'KW_NOT', diff --git a/scripts/nebula.service b/scripts/nebula.service index 98c011b3c59..a61c38e669e 100755 --- a/scripts/nebula.service +++ b/scripts/nebula.service @@ -234,7 +234,7 @@ function status_daemon { port=${GREEN}${port}${NC} else port=${BLINK}${RED}${port}${NC} - if [[$daemon_name == nebula-storaged]]; then + if [[ $daemon_name == nebula-storaged ]]; then WARN "${daemon_name} after v3.0.0 will not start service until it is added to cluster." WARN "See Manage Storage hosts:${RED}ADD HOSTS${NC} in https://docs.nebula-graph.io/" fi diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 7c5d7b7aa90..434fcc3690e 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -576,6 +576,7 @@ unreserved_keyword | KW_MERGE { $$ = new std::string("merge"); } | KW_DIVIDE { $$ = new std::string("divide"); } | KW_RENAME { $$ = new std::string("rename"); } + | KW_CLEAR { $$ = new std::string("clear"); } ; expression diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index ecdeb88b19a..7650feab8f5 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -144,7 +144,6 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}* "ADD" { return TokenType::KW_ADD; } "CREATE" { return TokenType::KW_CREATE;} "DROP" { return TokenType::KW_DROP; } -"CLEAR" { return TokenType::KW_CLEAR; } "REMOVE" { return TokenType::KW_REMOVE; } "IF" { return TokenType::KW_IF; } "NOT" { return TokenType::KW_NOT; } @@ -308,6 +307,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}* "MERGE" { return TokenType::KW_MERGE; } "RENAME" { return TokenType::KW_RENAME; } "DIVIDE" { return TokenType::KW_DIVIDE; } +"CLEAR" { return TokenType::KW_CLEAR; } "TRUE" { yylval->boolval = true; return TokenType::BOOL; } "FALSE" { yylval->boolval = false; return TokenType::BOOL; } From 33b27434f6f629a6efa661f9e4751add770b8ca1 Mon Sep 17 00:00:00 2001 From: panda-sheep <59197347+panda-sheep@users.noreply.github.com> Date: Thu, 7 Apr 2022 18:27:07 +0800 Subject: [PATCH 5/5] Limits for add and recover balance data or zone balance jobs (#4104) * If there are failed or stopped data balance or zone balance job, must firstly recover it * address wenhaocs's comments * address comments * add more comments * recover job * add more ut * adjust comment format * adjust comment format * adjust comment format --- src/clients/meta/MetaClient.cpp | 2 + src/common/graph/Response.h | 2 + src/graph/executor/StorageAccessExecutor.cpp | 1 + .../executor/admin/ChangePasswordExecutor.cpp | 1 + src/graph/executor/admin/ConfigExecutor.cpp | 1 + .../executor/admin/CreateUserExecutor.cpp | 1 + .../executor/admin/DescribeUserExecutor.cpp | 1 + src/graph/executor/admin/DropUserExecutor.cpp | 1 + .../executor/admin/KillQueryExecutor.cpp | 1 + .../executor/admin/ListRolesExecutor.cpp | 1 + .../executor/admin/ListUserRolesExecutor.cpp | 1 + src/graph/executor/admin/PartExecutor.cpp | 1 + .../executor/admin/RevokeRoleExecutor.cpp | 1 + .../executor/admin/ShowHostsExecutor.cpp | 1 + .../executor/admin/ShowMetaLeaderExecutor.cpp | 1 + .../admin/ShowServiceClientsExecutor.cpp | 1 + .../executor/admin/ShowStatsExecutor.cpp | 1 + .../executor/admin/SignInServiceExecutor.cpp | 1 + src/graph/executor/admin/SnapshotExecutor.cpp | 2 + src/graph/executor/logic/ArgumentExecutor.cpp | 1 + src/graph/executor/logic/LoopExecutor.cpp | 2 +- src/graph/executor/logic/SelectExecutor.cpp | 1 + src/graph/executor/maintain/EdgeExecutor.cpp | 1 + .../executor/maintain/FTIndexExecutor.cpp | 1 + src/graph/executor/maintain/TagExecutor.cpp | 1 + src/graph/executor/mutate/DeleteExecutor.cpp | 1 + src/graph/executor/mutate/InsertExecutor.cpp | 1 + .../executor/query/AggregateExecutor.cpp | 1 + src/graph/executor/query/AssignExecutor.cpp | 1 + .../executor/query/DataCollectExecutor.cpp | 1 + src/graph/executor/query/FilterExecutor.cpp | 1 + src/graph/executor/query/GetEdgesExecutor.cpp | 1 + .../executor/query/GetNeighborsExecutor.cpp | 1 + .../executor/query/IndexScanExecutor.cpp | 1 + .../executor/query/InnerJoinExecutor.cpp | 1 + .../executor/query/IntersectExecutor.cpp | 1 + src/graph/executor/query/JoinExecutor.cpp | 1 + src/graph/executor/query/LeftJoinExecutor.cpp | 1 + src/graph/executor/query/LimitExecutor.cpp | 1 + src/graph/executor/query/MinusExecutor.cpp | 1 + src/graph/executor/query/ProjectExecutor.cpp | 1 + src/graph/executor/query/SampleExecutor.cpp | 1 + .../executor/query/ScanEdgesExecutor.cpp | 1 + src/graph/executor/query/SetExecutor.cpp | 1 + src/graph/executor/query/SortExecutor.cpp | 1 + src/graph/executor/query/TopNExecutor.cpp | 1 + src/graph/executor/query/TraverseExecutor.cpp | 1 + .../query/UnionAllVersionVarExecutor.cpp | 1 + src/graph/executor/query/UnionExecutor.cpp | 1 + src/graph/executor/query/UnwindExecutor.cpp | 1 + src/interface/common.thrift | 1 + src/meta/processors/job/AdminJobProcessor.cpp | 12 +- src/meta/processors/job/JobDescription.h | 3 +- src/meta/processors/job/JobManager.cpp | 170 +++++++++++++++++- src/meta/processors/job/JobManager.h | 25 ++- src/meta/test/JobManagerTest.cpp | 133 +++++++++++--- 56 files changed, 359 insertions(+), 39 deletions(-) diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 0371034b818..d57d6012ea3 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -915,6 +915,8 @@ Status MetaClient::handleResponse(const RESP& resp) { return Status::Error("No valid job!"); case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE: return Status::Error("Job not existed in chosen space!"); + case nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER: + return Status::Error("Need to recover failed data balance job or zone balance job firstly!"); case nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE: return Status::Error("Backup empty table!"); case nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED: diff --git a/src/common/graph/Response.h b/src/common/graph/Response.h index 7c4679ef35f..abf151811dc 100644 --- a/src/common/graph/Response.h +++ b/src/common/graph/Response.h @@ -106,6 +106,8 @@ X(E_BALANCER_FAILURE, -2047) \ X(E_JOB_NOT_FINISHED, -2048) \ X(E_TASK_REPORT_OUT_DATE, -2049) \ + X(E_JOB_NOT_IN_SPACE, -2050) \ + X(E_JOB_NEED_RECOVER, -2051) \ X(E_INVALID_JOB, -2065) \ \ /* Backup Failure */ \ diff --git a/src/graph/executor/StorageAccessExecutor.cpp b/src/graph/executor/StorageAccessExecutor.cpp index c315cdde008..00c29e5226e 100644 --- a/src/graph/executor/StorageAccessExecutor.cpp +++ b/src/graph/executor/StorageAccessExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/StorageAccessExecutor.h" #include + #include "graph/context/Iterator.h" #include "graph/context/QueryExpressionContext.h" #include "graph/util/SchemaUtil.h" diff --git a/src/graph/executor/admin/ChangePasswordExecutor.cpp b/src/graph/executor/admin/ChangePasswordExecutor.cpp index 8e7c4f8d86a..da5ac0b1761 100644 --- a/src/graph/executor/admin/ChangePasswordExecutor.cpp +++ b/src/graph/executor/admin/ChangePasswordExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/admin/ChangePasswordExecutor.h" #include + #include "graph/planner/plan/Admin.h" namespace nebula { diff --git a/src/graph/executor/admin/ConfigExecutor.cpp b/src/graph/executor/admin/ConfigExecutor.cpp index 3d0525baddf..1b91e4ff177 100644 --- a/src/graph/executor/admin/ConfigExecutor.cpp +++ b/src/graph/executor/admin/ConfigExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/admin/ConfigExecutor.h" #include + #include "common/conf/Configuration.h" #include "graph/planner/plan/Admin.h" #include "graph/util/SchemaUtil.h" diff --git a/src/graph/executor/admin/CreateUserExecutor.cpp b/src/graph/executor/admin/CreateUserExecutor.cpp index b5e9c29c2ca..34d9984c780 100644 --- a/src/graph/executor/admin/CreateUserExecutor.cpp +++ b/src/graph/executor/admin/CreateUserExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/admin/CreateUserExecutor.h" #include + #include "graph/planner/plan/Admin.h" namespace nebula { diff --git a/src/graph/executor/admin/DescribeUserExecutor.cpp b/src/graph/executor/admin/DescribeUserExecutor.cpp index 6353287f985..e1a3f8e473e 100644 --- a/src/graph/executor/admin/DescribeUserExecutor.cpp +++ b/src/graph/executor/admin/DescribeUserExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/admin/DescribeUserExecutor.h" #include + #include "graph/planner/plan/Admin.h" #include "interface/gen-cpp2/meta_types.h" diff --git a/src/graph/executor/admin/DropUserExecutor.cpp b/src/graph/executor/admin/DropUserExecutor.cpp index 21051b6e125..23015981870 100644 --- a/src/graph/executor/admin/DropUserExecutor.cpp +++ b/src/graph/executor/admin/DropUserExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/admin/DropUserExecutor.h" + #include "graph/planner/plan/Admin.h" namespace nebula { diff --git a/src/graph/executor/admin/KillQueryExecutor.cpp b/src/graph/executor/admin/KillQueryExecutor.cpp index 47628803d55..55d827c6316 100644 --- a/src/graph/executor/admin/KillQueryExecutor.cpp +++ b/src/graph/executor/admin/KillQueryExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/admin/KillQueryExecutor.h" + #include "graph/planner/plan/Admin.h" namespace nebula { diff --git a/src/graph/executor/admin/ListRolesExecutor.cpp b/src/graph/executor/admin/ListRolesExecutor.cpp index 17d6b22f85b..b2e3639ec80 100644 --- a/src/graph/executor/admin/ListRolesExecutor.cpp +++ b/src/graph/executor/admin/ListRolesExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/admin/ListRolesExecutor.h" #include + #include "graph/planner/plan/Admin.h" #include "graph/service/PermissionManager.h" diff --git a/src/graph/executor/admin/ListUserRolesExecutor.cpp b/src/graph/executor/admin/ListUserRolesExecutor.cpp index 201a3b8fcf4..e5eefb02135 100644 --- a/src/graph/executor/admin/ListUserRolesExecutor.cpp +++ b/src/graph/executor/admin/ListUserRolesExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/admin/ListUserRolesExecutor.h" #include + #include "graph/planner/plan/Admin.h" namespace nebula { diff --git a/src/graph/executor/admin/PartExecutor.cpp b/src/graph/executor/admin/PartExecutor.cpp index dfd88907206..5868b0e6d40 100644 --- a/src/graph/executor/admin/PartExecutor.cpp +++ b/src/graph/executor/admin/PartExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/admin/PartExecutor.h" + #include "graph/planner/plan/Admin.h" using nebula::network::NetworkUtils; diff --git a/src/graph/executor/admin/RevokeRoleExecutor.cpp b/src/graph/executor/admin/RevokeRoleExecutor.cpp index be59f42166a..3182d75eb40 100644 --- a/src/graph/executor/admin/RevokeRoleExecutor.cpp +++ b/src/graph/executor/admin/RevokeRoleExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/admin/RevokeRoleExecutor.h" + #include "graph/planner/plan/Admin.h" #include "graph/service/PermissionManager.h" #include "interface/gen-cpp2/meta_types.h" diff --git a/src/graph/executor/admin/ShowHostsExecutor.cpp b/src/graph/executor/admin/ShowHostsExecutor.cpp index 23860c5fa3d..999214c858c 100644 --- a/src/graph/executor/admin/ShowHostsExecutor.cpp +++ b/src/graph/executor/admin/ShowHostsExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/admin/ShowHostsExecutor.h" #include + #include "graph/planner/plan/Admin.h" namespace nebula { diff --git a/src/graph/executor/admin/ShowMetaLeaderExecutor.cpp b/src/graph/executor/admin/ShowMetaLeaderExecutor.cpp index 21a43387a22..86a95241915 100644 --- a/src/graph/executor/admin/ShowMetaLeaderExecutor.cpp +++ b/src/graph/executor/admin/ShowMetaLeaderExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/admin/ShowMetaLeaderExecutor.h" #include + #include "common/time/TimeUtils.h" #include "graph/planner/plan/Admin.h" diff --git a/src/graph/executor/admin/ShowServiceClientsExecutor.cpp b/src/graph/executor/admin/ShowServiceClientsExecutor.cpp index 25f7425de20..651763f2dc3 100644 --- a/src/graph/executor/admin/ShowServiceClientsExecutor.cpp +++ b/src/graph/executor/admin/ShowServiceClientsExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/admin/ShowServiceClientsExecutor.h" #include + #include "graph/planner/plan/Admin.h" #include "graph/service/PermissionManager.h" #include "interface/gen-cpp2/meta_types.h" diff --git a/src/graph/executor/admin/ShowStatsExecutor.cpp b/src/graph/executor/admin/ShowStatsExecutor.cpp index d9c0715c8eb..56f60ba53a2 100644 --- a/src/graph/executor/admin/ShowStatsExecutor.cpp +++ b/src/graph/executor/admin/ShowStatsExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/admin/ShowStatsExecutor.h" + #include "graph/planner/plan/Admin.h" #include "graph/service/PermissionManager.h" #include "graph/util/SchemaUtil.h" diff --git a/src/graph/executor/admin/SignInServiceExecutor.cpp b/src/graph/executor/admin/SignInServiceExecutor.cpp index 34dfbd86d45..10d4b6ee130 100644 --- a/src/graph/executor/admin/SignInServiceExecutor.cpp +++ b/src/graph/executor/admin/SignInServiceExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/admin/SignInServiceExecutor.h" + #include "graph/planner/plan/Admin.h" namespace nebula { diff --git a/src/graph/executor/admin/SnapshotExecutor.cpp b/src/graph/executor/admin/SnapshotExecutor.cpp index 2e7f81e4466..477de45ddc0 100644 --- a/src/graph/executor/admin/SnapshotExecutor.cpp +++ b/src/graph/executor/admin/SnapshotExecutor.cpp @@ -3,7 +3,9 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/admin/SnapshotExecutor.h" + #include + #include "graph/planner/plan/Admin.h" namespace nebula { diff --git a/src/graph/executor/logic/ArgumentExecutor.cpp b/src/graph/executor/logic/ArgumentExecutor.cpp index 0c1cd69c533..941690efab0 100644 --- a/src/graph/executor/logic/ArgumentExecutor.cpp +++ b/src/graph/executor/logic/ArgumentExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/logic/ArgumentExecutor.h" + #include "graph/planner/plan/Logic.h" namespace nebula { diff --git a/src/graph/executor/logic/LoopExecutor.cpp b/src/graph/executor/logic/LoopExecutor.cpp index b07687d3b41..e45faaa7696 100644 --- a/src/graph/executor/logic/LoopExecutor.cpp +++ b/src/graph/executor/logic/LoopExecutor.cpp @@ -3,8 +3,8 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/logic/LoopExecutor.h" -#include "graph/planner/plan/Logic.h" +#include "graph/planner/plan/Logic.h" namespace nebula { namespace graph { diff --git a/src/graph/executor/logic/SelectExecutor.cpp b/src/graph/executor/logic/SelectExecutor.cpp index 5fd0810311d..3df0d48a066 100644 --- a/src/graph/executor/logic/SelectExecutor.cpp +++ b/src/graph/executor/logic/SelectExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/logic/SelectExecutor.h" + #include "graph/planner/plan/Logic.h" namespace nebula { diff --git a/src/graph/executor/maintain/EdgeExecutor.cpp b/src/graph/executor/maintain/EdgeExecutor.cpp index 81ed6c1d8aa..ed4f144141d 100644 --- a/src/graph/executor/maintain/EdgeExecutor.cpp +++ b/src/graph/executor/maintain/EdgeExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/maintain/EdgeExecutor.h" + #include "graph/planner/plan/Maintain.h" #include "graph/util/SchemaUtil.h" diff --git a/src/graph/executor/maintain/FTIndexExecutor.cpp b/src/graph/executor/maintain/FTIndexExecutor.cpp index 78178142d14..4b32d3939c6 100644 --- a/src/graph/executor/maintain/FTIndexExecutor.cpp +++ b/src/graph/executor/maintain/FTIndexExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/maintain/FTIndexExecutor.h" + #include "graph/planner/plan/Maintain.h" #include "graph/util/FTIndexUtils.h" #include "interface/gen-cpp2/meta_types.h" diff --git a/src/graph/executor/maintain/TagExecutor.cpp b/src/graph/executor/maintain/TagExecutor.cpp index 2b3c842b227..c30578388dd 100644 --- a/src/graph/executor/maintain/TagExecutor.cpp +++ b/src/graph/executor/maintain/TagExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/maintain/TagExecutor.h" + #include "graph/planner/plan/Maintain.h" #include "graph/util/SchemaUtil.h" diff --git a/src/graph/executor/mutate/DeleteExecutor.cpp b/src/graph/executor/mutate/DeleteExecutor.cpp index 33341fd352d..b3df2670e68 100644 --- a/src/graph/executor/mutate/DeleteExecutor.cpp +++ b/src/graph/executor/mutate/DeleteExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/mutate/DeleteExecutor.h" + #include "graph/planner/plan/Mutate.h" #include "graph/service/GraphFlags.h" #include "graph/util/SchemaUtil.h" diff --git a/src/graph/executor/mutate/InsertExecutor.cpp b/src/graph/executor/mutate/InsertExecutor.cpp index 14b75e8aff3..2b082c35d97 100644 --- a/src/graph/executor/mutate/InsertExecutor.cpp +++ b/src/graph/executor/mutate/InsertExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/mutate/InsertExecutor.h" + #include "graph/planner/plan/Mutate.h" #include "graph/service/GraphFlags.h" diff --git a/src/graph/executor/query/AggregateExecutor.cpp b/src/graph/executor/query/AggregateExecutor.cpp index 5b36b8a9f66..76c54278359 100644 --- a/src/graph/executor/query/AggregateExecutor.cpp +++ b/src/graph/executor/query/AggregateExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/AggregateExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/AssignExecutor.cpp b/src/graph/executor/query/AssignExecutor.cpp index 016fbbb59fe..646d87d1003 100644 --- a/src/graph/executor/query/AssignExecutor.cpp +++ b/src/graph/executor/query/AssignExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/AssignExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/DataCollectExecutor.cpp b/src/graph/executor/query/DataCollectExecutor.cpp index 4edef430b23..fc14157221e 100644 --- a/src/graph/executor/query/DataCollectExecutor.cpp +++ b/src/graph/executor/query/DataCollectExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/DataCollectExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/FilterExecutor.cpp b/src/graph/executor/query/FilterExecutor.cpp index 87d9775c9bb..4c9482a7061 100644 --- a/src/graph/executor/query/FilterExecutor.cpp +++ b/src/graph/executor/query/FilterExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/FilterExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/GetEdgesExecutor.cpp b/src/graph/executor/query/GetEdgesExecutor.cpp index 1972d0f1fd4..a129df1d3d2 100644 --- a/src/graph/executor/query/GetEdgesExecutor.cpp +++ b/src/graph/executor/query/GetEdgesExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/GetEdgesExecutor.h" + #include "graph/planner/plan/Query.h" using nebula::storage::StorageClient; diff --git a/src/graph/executor/query/GetNeighborsExecutor.cpp b/src/graph/executor/query/GetNeighborsExecutor.cpp index d2aff232875..9c90e66c578 100644 --- a/src/graph/executor/query/GetNeighborsExecutor.cpp +++ b/src/graph/executor/query/GetNeighborsExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/GetNeighborsExecutor.h" + #include "graph/service/GraphFlags.h" using nebula::storage::StorageClient; diff --git a/src/graph/executor/query/IndexScanExecutor.cpp b/src/graph/executor/query/IndexScanExecutor.cpp index 39631767b83..c74f715b037 100644 --- a/src/graph/executor/query/IndexScanExecutor.cpp +++ b/src/graph/executor/query/IndexScanExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/IndexScanExecutor.h" + #include "graph/service/GraphFlags.h" using nebula::storage::StorageClient; diff --git a/src/graph/executor/query/InnerJoinExecutor.cpp b/src/graph/executor/query/InnerJoinExecutor.cpp index acebeabb3ce..e52c40b6cbd 100644 --- a/src/graph/executor/query/InnerJoinExecutor.cpp +++ b/src/graph/executor/query/InnerJoinExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/InnerJoinExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/IntersectExecutor.cpp b/src/graph/executor/query/IntersectExecutor.cpp index 5e2ae7fbe66..dfb1216a257 100644 --- a/src/graph/executor/query/IntersectExecutor.cpp +++ b/src/graph/executor/query/IntersectExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/IntersectExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/JoinExecutor.cpp b/src/graph/executor/query/JoinExecutor.cpp index c3d73cc487e..11fd5a18e71 100644 --- a/src/graph/executor/query/JoinExecutor.cpp +++ b/src/graph/executor/query/JoinExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/JoinExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/LeftJoinExecutor.cpp b/src/graph/executor/query/LeftJoinExecutor.cpp index 06725b00a62..fd43ac30139 100644 --- a/src/graph/executor/query/LeftJoinExecutor.cpp +++ b/src/graph/executor/query/LeftJoinExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/LeftJoinExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/LimitExecutor.cpp b/src/graph/executor/query/LimitExecutor.cpp index 83937194400..38502d2bf3b 100644 --- a/src/graph/executor/query/LimitExecutor.cpp +++ b/src/graph/executor/query/LimitExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/LimitExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/MinusExecutor.cpp b/src/graph/executor/query/MinusExecutor.cpp index d2b4e600f80..0ce5b9d9045 100644 --- a/src/graph/executor/query/MinusExecutor.cpp +++ b/src/graph/executor/query/MinusExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/MinusExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/ProjectExecutor.cpp b/src/graph/executor/query/ProjectExecutor.cpp index bae734c8d49..b4f80b9b484 100644 --- a/src/graph/executor/query/ProjectExecutor.cpp +++ b/src/graph/executor/query/ProjectExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/ProjectExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/SampleExecutor.cpp b/src/graph/executor/query/SampleExecutor.cpp index f955f2d4371..66cfcc9fd94 100644 --- a/src/graph/executor/query/SampleExecutor.cpp +++ b/src/graph/executor/query/SampleExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/SampleExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/ScanEdgesExecutor.cpp b/src/graph/executor/query/ScanEdgesExecutor.cpp index aa4a50201b3..de354a8748f 100644 --- a/src/graph/executor/query/ScanEdgesExecutor.cpp +++ b/src/graph/executor/query/ScanEdgesExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/ScanEdgesExecutor.h" + #include "graph/planner/plan/Query.h" #include "graph/util/SchemaUtil.h" diff --git a/src/graph/executor/query/SetExecutor.cpp b/src/graph/executor/query/SetExecutor.cpp index e0d7276087a..3681536a873 100644 --- a/src/graph/executor/query/SetExecutor.cpp +++ b/src/graph/executor/query/SetExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/SetExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/SortExecutor.cpp b/src/graph/executor/query/SortExecutor.cpp index 0e9086d8027..ebeba5cd854 100644 --- a/src/graph/executor/query/SortExecutor.cpp +++ b/src/graph/executor/query/SortExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/SortExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/TopNExecutor.cpp b/src/graph/executor/query/TopNExecutor.cpp index 150765d49f9..e685a8ff31b 100644 --- a/src/graph/executor/query/TopNExecutor.cpp +++ b/src/graph/executor/query/TopNExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/TopNExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 2460012dbf0..0210a5ce6c5 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/TraverseExecutor.h" + #include "clients/storage/StorageClient.h" #include "graph/service/GraphFlags.h" #include "graph/util/SchemaUtil.h" diff --git a/src/graph/executor/query/UnionAllVersionVarExecutor.cpp b/src/graph/executor/query/UnionAllVersionVarExecutor.cpp index 92557f31803..019b1b6261a 100644 --- a/src/graph/executor/query/UnionAllVersionVarExecutor.cpp +++ b/src/graph/executor/query/UnionAllVersionVarExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/UnionAllVersionVarExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/UnionExecutor.cpp b/src/graph/executor/query/UnionExecutor.cpp index b1731964c11..25e203ab632 100644 --- a/src/graph/executor/query/UnionExecutor.cpp +++ b/src/graph/executor/query/UnionExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/UnionExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/graph/executor/query/UnwindExecutor.cpp b/src/graph/executor/query/UnwindExecutor.cpp index 28fb8aa5718..de1d774e0a7 100644 --- a/src/graph/executor/query/UnwindExecutor.cpp +++ b/src/graph/executor/query/UnwindExecutor.cpp @@ -3,6 +3,7 @@ // This source code is licensed under Apache 2.0 License. #include "graph/executor/query/UnwindExecutor.h" + #include "graph/planner/plan/Query.h" namespace nebula { diff --git a/src/interface/common.thrift b/src/interface/common.thrift index 86b262e0910..5214b5b63a9 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -391,6 +391,7 @@ enum ErrorCode { E_JOB_NOT_FINISHED = -2048, E_TASK_REPORT_OUT_DATE = -2049, E_JOB_NOT_IN_SPACE = -2050, + E_JOB_NEED_RECOVER = -2051, E_INVALID_JOB = -2065, // Backup Failure diff --git a/src/meta/processors/job/AdminJobProcessor.cpp b/src/meta/processors/job/AdminJobProcessor.cpp index b073ffb4ecf..dfd3e96ae33 100644 --- a/src/meta/processors/job/AdminJobProcessor.cpp +++ b/src/meta/processors/job/AdminJobProcessor.cpp @@ -90,7 +90,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { break; } case nebula::meta::cpp2::JobOp::RECOVER: { - // Note that the last parameter is no longer spaceId + // Note that the last parameter is no longer spaceName std::vector jobIds; jobIds.reserve(paras.size()); for (size_t i = 0; i < paras.size(); i++) { @@ -125,13 +125,19 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq // Check if job not exists JobID jId = 0; - auto jobExist = jobMgr_->checkJobExist(spaceId_, type, paras, jId); - if (jobExist) { + auto runningJobExist = jobMgr_->checkOnRunningJobExist(spaceId_, type, paras, jId); + if (runningJobExist) { LOG(INFO) << "Job has already exists: " << jId; result.job_id_ref() = jId; return nebula::cpp2::ErrorCode::SUCCEEDED; } + auto retCode = jobMgr_->checkNeedRecoverJobExist(spaceId_, type); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << "There is a failed data balance or zone balance job, need to recover it firstly!"; + return retCode; + } + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto jobId = autoIncrementId(); if (!nebula::ok(jobId)) { diff --git a/src/meta/processors/job/JobDescription.h b/src/meta/processors/job/JobDescription.h index a1328aa8d9a..16c505d61a6 100644 --- a/src/meta/processors/job/JobDescription.h +++ b/src/meta/processors/job/JobDescription.h @@ -159,8 +159,7 @@ class JobDescription { cpp2::JobDesc toJobDesc(); bool operator==(const JobDescription& that) const { - return space_ == that.space_ && type_ == that.type_ && paras_ == that.paras_ && - status_ == that.status_; + return space_ == that.space_ && type_ == that.type_ && paras_ == that.paras_; } bool operator!=(const JobDescription& that) const { diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index d6967693e98..d3e32a4b078 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -248,7 +248,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId, // Set the errorcode of the job nebula::cpp2::ErrorCode jobErrCode = nebula::cpp2::ErrorCode::SUCCEEDED; - if (jobStatus != cpp2::JobStatus::FINISHED) { + if (jobStatus == cpp2::JobStatus::FAILED) { // Traverse the tasks and find the first task errorcode unsuccessful auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId); std::unique_ptr iter; @@ -571,10 +571,10 @@ nebula::cpp2::ErrorCode JobManager::removeExpiredJobs( return ret; } -bool JobManager::checkJobExist(GraphSpaceID spaceId, - const cpp2::JobType& jobType, - const std::vector& paras, - JobID& jobId) { +bool JobManager::checkOnRunningJobExist(GraphSpaceID spaceId, + const cpp2::JobType& jobType, + const std::vector& paras, + JobID& jobId) { JobDescription jobDesc(spaceId, 0, jobType, paras); auto it = inFlightJobs_.begin(); while (it != inFlightJobs_.end()) { @@ -587,6 +587,35 @@ bool JobManager::checkJobExist(GraphSpaceID spaceId, return false; } +nebula::cpp2::ErrorCode JobManager::checkNeedRecoverJobExist(GraphSpaceID spaceId, + const cpp2::JobType& jobType) { + if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) { + std::unique_ptr iter; + auto jobPre = MetaKeyUtils::jobPrefix(spaceId); + auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << "Fetch jobs failed, error: " << apache::thrift::util::enumNameSafe(retCode); + return retCode; + } + + for (; iter->valid(); iter->next()) { + if (!MetaKeyUtils::isJobKey(iter->key())) { + continue; + } + auto tup = MetaKeyUtils::parseJobVal(iter->val()); + auto type = std::get<0>(tup); + auto status = std::get<2>(tup); + if (type == cpp2::JobType::DATA_BALANCE || type == cpp2::JobType::ZONE_BALANCE) { + // QUEUE: The job has not been executed, the machine restarted + if (status == cpp2::JobStatus::FAILED || status == cpp2::JobStatus::QUEUE) { + return nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER; + } + } + } + } + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + ErrorOr>> JobManager::showJob(GraphSpaceID spaceId, JobID jobId) { auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId); @@ -661,6 +690,7 @@ ErrorOr JobManager::recoverJob( } else { std::vector jobKeys; jobKeys.reserve(jobIds.size()); + std::vector> totalJobKVs; for (int jobId : jobIds) { jobKeys.emplace_back(MetaKeyUtils::jobKey(spaceId, jobId)); } @@ -671,7 +701,132 @@ ErrorOr JobManager::recoverJob( return retCode.first; } for (size_t i = 0; i < jobKeys.size(); i++) { - jobKVs.emplace_back(std::make_pair(jobKeys[i], jobVals[i])); + totalJobKVs.emplace_back(std::make_pair(jobKeys[i], jobVals[i])); + } + + // For DATA_BALANCE and ZONE_BALANCE job, jobs with STOPPED, FAILED, QUEUE status + // !!! The following situations can be recovered, only for jobs of the same type + // of DATA_BALANCE or ZONE_BALANCE。 + // QUEUE: The job has not been executed, then the machine restarted. + // FAILED: + // The failed job will be recovered. + // FAILED and QUEUE jobs will not exist at the same time. + // STOPPED: + // If only one stopped jobId is specified, No FINISHED job or FAILED job of the + // same type after this job. + // If multiple jobs of the same type are specified, only starttime latest jobId + // will can be recovered, no FINISHED job or FAILED job of the same type after this latest job. + // The same type of STOPPED job exists in the following form, sorted by starttime: + // STOPPED job1, FAILED job2 + // recover job job1 failed + // recover job job2 success + // STOPPED job1, FINISHED job2, STOPPED job3 + // recover job job1 failed + // recover job job3 success + // recover job job1,job3 Only job3 can recover + std::unordered_map> dupResult; + std::unordered_map> dupkeyVal; + + for (auto& jobkv : totalJobKVs) { + auto optJobRet = JobDescription::makeJobDescription(jobkv.first, jobkv.second); + if (nebula::ok(optJobRet)) { + auto optJob = nebula::value(optJobRet); + auto jobStatus = optJob.getStatus(); + auto jobId = optJob.getJobId(); + auto jobType = optJob.getJobType(); + auto jobStartTime = optJob.getStartTime(); + if (jobStatus != cpp2::JobStatus::QUEUE && jobStatus != cpp2::JobStatus::FAILED && + jobStatus != cpp2::JobStatus::STOPPED) { + continue; + } + + // handle DATA_BALANCE and ZONE_BALANCE + if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) { + // FAILED and QUEUE jobs will not exist at the same time. + if (jobStatus == cpp2::JobStatus::FAILED || jobStatus == cpp2::JobStatus::QUEUE) { + dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus); + dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second)); + continue; + } + + // current recover job status is stopped + auto findJobIter = dupResult.find(jobType); + if (findJobIter != dupResult.end()) { + auto oldJobInfo = findJobIter->second; + if (std::get<2>(oldJobInfo) != cpp2::JobStatus::STOPPED) { + continue; + } + } + + // For a stopped job, check whether there is the same type of finished or + // failed job after it. + std::unique_ptr iter; + auto jobPre = MetaKeyUtils::jobPrefix(spaceId); + auto code = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << "Fetch jobs failed, error: " << apache::thrift::util::enumNameSafe(code); + return code; + } + + bool findRest = false; + for (; iter->valid(); iter->next()) { + if (!MetaKeyUtils::isJobKey(iter->key())) { + continue; + } + + // eliminate oneself + auto keyPair = MetaKeyUtils::parseJobKey(iter->key()); + auto destJobId = keyPair.second; + if (destJobId == jobId) { + continue; + } + auto tup = MetaKeyUtils::parseJobVal(iter->val()); + auto destJobType = std::get<0>(tup); + auto destJobStatus = std::get<2>(tup); + auto destJobStartTime = std::get<3>(tup); + if (jobType == destJobType) { + // There is a specific type of failed job that does not allow recovery for the type of + // stopped job + if (destJobStatus == cpp2::JobStatus::FAILED) { + LOG(ERROR) << "There is a specific type of failed job that does not allow recovery " + "for the type of stopped job"; + findRest = true; + break; + } else if (destJobStatus == cpp2::JobStatus::FINISHED) { + // Compare the start time of the job + if (destJobStartTime > jobStartTime) { + findRest = true; + break; + } + } + } + } + if (!findRest) { + auto findStoppedJobIter = dupResult.find(jobType); + if (findStoppedJobIter != dupResult.end()) { + // update stopped job + auto oldJobInfo = findStoppedJobIter->second; + auto oldJobStartTime = std::get<1>(oldJobInfo); + if (jobStartTime > oldJobStartTime) { + auto oldJobId = std::get<0>(oldJobInfo); + dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus); + dupkeyVal.erase(oldJobId); + dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second)); + } + } else { + // insert + dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus); + dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second)); + } + } + } else { + jobKVs.emplace_back(std::make_pair(jobkv.first, jobkv.second)); + } + } + } + for (auto& key : dupResult) { + auto jId = std::get<0>(key.second); + jobKVs.emplace_back(dupkeyVal[jId]); } } @@ -684,7 +839,8 @@ ErrorOr JobManager::recoverJob( optJob.getStatus() == cpp2::JobStatus::STOPPED))) { // Check if the job exists JobID jId = 0; - auto jobExist = checkJobExist(spaceId, optJob.getJobType(), optJob.getParas(), jId); + auto jobExist = + checkOnRunningJobExist(spaceId, optJob.getJobType(), optJob.getParas(), jId); if (!jobExist) { auto jobId = optJob.getJobId(); enqueue(spaceId, jobId, JbOp::RECOVER, optJob.getJobType()); diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 22371b4df8b..3fc6b5c965e 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -92,18 +92,31 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { nebula::cpp2::ErrorCode addJob(JobDescription& jobDesc, AdminClient* client); /** - * @brief The same job is in jobMap + * @brief The same job in inFlightJobs_. + * Jobs in inFlightJobs_ have three status: + * QUEUE: when adding a job + * FAILED or STOPPED: when recover job * * @param spaceId * @param type * @param paras * @param jobId If the job exists, jobId is the id of the existing job - * @return + * @return True if job exists. + */ + bool checkOnRunningJobExist(GraphSpaceID spaceId, + const cpp2::JobType& type, + const std::vector& paras, + JobID& jobId); + /** + * @brief In the current space, if there is a failed data balance job or zone balance job, + * need to recover the job first, otherwise cannot add this type of job. + * + * @param spaceId + * @param jobType + * @return nebula::cpp2::ErrorCode */ - bool checkJobExist(GraphSpaceID spaceId, - const cpp2::JobType& type, - const std::vector& paras, - JobID& jobId); + nebula::cpp2::ErrorCode checkNeedRecoverJobExist(GraphSpaceID spaceId, + const cpp2::JobType& jobType); /** * @brief Load all jobs of the space from kvStore and convert to cpp2::JobDesc diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index 4587e1758ed..48761f3223c 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -90,6 +90,22 @@ TEST_F(JobManagerTest, AddJob) { JobDescription jobDesc(spaceId, jobId, cpp2::JobType::COMPACT); auto rc = jobMgr->addJob(jobDesc, adminClient_.get()); ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED); + + // If there is a failed data balance job, a new job cannot be added + JobID jobId3 = 3; + JobDescription jobDesc3(spaceId, jobId3, cpp2::JobType::DATA_BALANCE); + jobDesc3.setStatus(cpp2::JobStatus::FAILED); + auto jobKey = MetaKeyUtils::jobKey(jobDesc3.getSpace(), jobDesc3.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jobDesc3.getJobType(), + jobDesc3.getParas(), + jobDesc3.getStatus(), + jobDesc3.getStartTime(), + jobDesc3.getStopTime(), + jobDesc3.getErrorCode()); + jobMgr->save(std::move(jobKey), std::move(jobVal)); + + rc = jobMgr->checkNeedRecoverJobExist(spaceId, jobDesc3.getJobType()); + ASSERT_EQ(rc, nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER); } TEST_F(JobManagerTest, AddRebuildTagIndexJob) { @@ -287,7 +303,8 @@ TEST_F(JobManagerTest, JobDeduplication) { JobID jobId3 = 17; JobDescription jobDesc3(spaceId, jobId3, cpp2::JobType::LEADER_BALANCE); JobID jId3 = 0; - auto jobExist = jobMgr->checkJobExist(spaceId, jobDesc3.getJobType(), jobDesc3.getParas(), jId3); + auto jobExist = + jobMgr->checkOnRunningJobExist(spaceId, jobDesc3.getJobType(), jobDesc3.getParas(), jId3); if (!jobExist) { auto rc3 = jobMgr->addJob(jobDesc3, adminClient_.get()); ASSERT_EQ(rc3, nebula::cpp2::ErrorCode::SUCCEEDED); @@ -296,7 +313,8 @@ TEST_F(JobManagerTest, JobDeduplication) { JobID jobId4 = 18; JobDescription jobDesc4(spaceId, jobId4, cpp2::JobType::COMPACT); JobID jId4 = 0; - jobExist = jobMgr->checkJobExist(spaceId, jobDesc4.getJobType(), jobDesc4.getParas(), jId4); + jobExist = + jobMgr->checkOnRunningJobExist(spaceId, jobDesc4.getJobType(), jobDesc4.getParas(), jId4); if (!jobExist) { auto rc4 = jobMgr->addJob(jobDesc4, adminClient_.get()); ASSERT_NE(rc4, nebula::cpp2::ErrorCode::SUCCEEDED); @@ -524,26 +542,101 @@ TEST_F(JobManagerTest, ShowJobInOtherSpace) { } TEST_F(JobManagerTest, RecoverJob) { - std::unique_ptr> jobMgr = getJobManager(); - // set status to prevent running the job since AdminClient is a injector - jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; - jobMgr->bgThread_.join(); - GraphSpaceID spaceId = 1; - int32_t nJob = 3; - for (auto jobId = 0; jobId < nJob; ++jobId) { - JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH); - auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); - auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), - jd.getParas(), - jd.getStatus(), - jd.getStartTime(), - jd.getStopTime(), - jd.getErrorCode()); - jobMgr->save(jobKey, jobVal); + // case 1,recover Queue status job + { + std::unique_ptr> jobMgr = getJobManager(); + // set status to prevent running the job since AdminClient is a injector + jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; + jobMgr->bgThread_.join(); + GraphSpaceID spaceId = 1; + int32_t nJob = 3; + for (auto jobId = 0; jobId < nJob; ++jobId) { + JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH); + auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); + jobMgr->save(jobKey, jobVal); + } + + auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr); + ASSERT_EQ(nebula::value(nJobRecovered), 1); + } + + // case 2 + // For the same type of job, if there are stopped jobs and failed jobs in turn + // recover stopped job failed + // recover failed job succeeded + { + std::unique_ptr> jobMgr = getJobManager(); + // set status to prevent running the job since AdminClient is a injector + jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; + jobMgr->bgThread_.join(); + GraphSpaceID spaceId = 1; + int32_t nJob = 3; + for (auto jobId = 0; jobId < nJob; ++jobId) { + cpp2::JobStatus jobStatus; + if (jobId == 2) { + jobStatus = cpp2::JobStatus::FAILED; + } else { + jobStatus = cpp2::JobStatus::STOPPED; + } + JobDescription jd(spaceId, jobId, cpp2::JobType::DATA_BALANCE, {}, jobStatus); + auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jobId + 100000, + jobId + 110000, + jd.getErrorCode()); + jobMgr->save(jobKey, jobVal); + } + + auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {1}); + ASSERT_EQ(nebula::value(nJobRecovered), 0); + + nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {2}); + ASSERT_EQ(nebula::value(nJobRecovered), 1); } - auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr); - ASSERT_EQ(nebula::value(nJobRecovered), 1); + // case 3 + // For the same type of job, if there are stopped jobs and finished jobs, stopped in turn + // recover stopped job befor finished job failed + // recover stopped job after finished job succeeded + { + std::unique_ptr> jobMgr = getJobManager(); + // set status to prevent running the job since AdminClient is a injector + jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; + jobMgr->bgThread_.join(); + GraphSpaceID spaceId = 1; + int32_t nJob = 4; + for (auto jobId = 0; jobId < nJob; ++jobId) { + cpp2::JobStatus jobStatus; + if (jobId == 2) { + jobStatus = cpp2::JobStatus::FINISHED; + } else { + jobStatus = cpp2::JobStatus::STOPPED; + } + JobDescription jd(spaceId, jobId, cpp2::JobType::DATA_BALANCE, {}, jobStatus); + auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jobId + 100000, + jobId + 110000, + jd.getErrorCode()); + jobMgr->save(jobKey, jobVal); + } + + auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {1}); + ASSERT_EQ(nebula::value(nJobRecovered), 0); + + nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {3}); + ASSERT_EQ(nebula::value(nJobRecovered), 1); + } } TEST(JobDescriptionTest, Ctor) {