diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 07a4064052a..4d49bf60972 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -366,11 +366,15 @@ nebula::cpp2::ErrorCode RaftPart::canAppendLogs() { nebula::cpp2::ErrorCode RaftPart::canAppendLogs(TermID termId) { DCHECK(!raftLock_.try_lock()); + nebula::cpp2::ErrorCode rc = canAppendLogs(); + if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { + return rc; + } if (UNLIKELY(term_ != termId)) { VLOG(2) << idStr_ << "Term has been updated, origin " << termId << ", new " << term_; return nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE; } - return canAppendLogs(); + return nebula::cpp2::ErrorCode::SUCCEEDED; } void RaftPart::addLearner(const HostAddr& addr) { diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index ba7d926584f..16ffda8b6ff 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -155,10 +155,20 @@ void RaftexService::addPartition(std::shared_ptr part) { } void RaftexService::removePartition(std::shared_ptr part) { - folly::RWSpinLock::WriteHolder wh(partsLock_); - parts_.erase(std::make_pair(part->spaceId(), part->partitionId())); + using FType = decltype(folly::makeFuture()); + using FTValype = typename FType::value_type; // Stop the partition - part->stop(); + folly::makeFuture() + .thenValue([this, &part](FTValype) { + folly::RWSpinLock::WriteHolder wh(partsLock_); + parts_.erase(std::make_pair(part->spaceId(), part->partitionId())); + }) + // the part->stop() would wait for requestOnGoing_ in Host, and the requestOnGoing_ will + // release in response in ioThreadPool,this may cause deadlock, so doing it in another + // threadpool to avoid this condition + .via(folly::getGlobalCPUExecutor()) + .thenValue([part](FTValype) { part->stop(); }) + .wait(); } std::shared_ptr RaftexService::findPart(GraphSpaceID spaceId, PartitionID partId) { diff --git a/src/meta/processors/job/BalanceJobExecutor.cpp b/src/meta/processors/job/BalanceJobExecutor.cpp index bb52f199319..d354fff681c 100644 --- a/src/meta/processors/job/BalanceJobExecutor.cpp +++ b/src/meta/processors/job/BalanceJobExecutor.cpp @@ -88,6 +88,36 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::save(const std::string& k, const std return rc; } +void BalanceJobExecutor::insertOneTask( + const BalanceTask& task, std::map>* existTasks) { + std::vector& taskVec = existTasks->operator[](task.getPartId()); + if (taskVec.empty()) { + taskVec.emplace_back(task); + } else { + for (auto it = taskVec.begin(); it != taskVec.end(); it++) { + if (task.getDstHost() == it->getSrcHost() && task.getSrcHost() == it->getDstHost()) { + taskVec.erase(it); + return; + } else if (task.getDstHost() == it->getSrcHost()) { + BalanceTask newTask(task); + newTask.setDstHost(it->getDstHost()); + taskVec.erase(it); + insertOneTask(newTask, existTasks); + return; + } else if (task.getSrcHost() == it->getDstHost()) { + BalanceTask newTask(task); + newTask.setSrcHost(it->getSrcHost()); + taskVec.erase(it); + insertOneTask(newTask, existTasks); + return; + } else { + continue; + } + } + taskVec.emplace_back(task); + } +} + nebula::cpp2::ErrorCode SpaceInfo::loadInfo(GraphSpaceID spaceId, kvstore::KVStore* kvstore) { spaceId_ = spaceId; std::string spaceKey = MetaKeyUtils::spaceKey(spaceId); diff --git a/src/meta/processors/job/BalanceJobExecutor.h b/src/meta/processors/job/BalanceJobExecutor.h index a284c3b6485..64c9c7286b3 100644 --- a/src/meta/processors/job/BalanceJobExecutor.h +++ b/src/meta/processors/job/BalanceJobExecutor.h @@ -73,6 +73,9 @@ class BalanceJobExecutor : public MetaJobExecutor { return Status::OK(); } + void insertOneTask(const BalanceTask& task, + std::map>* existTasks); + protected: std::unique_ptr plan_; std::unique_ptr executor_; diff --git a/src/meta/processors/job/BalancePlan.cpp b/src/meta/processors/job/BalancePlan.cpp index 84ed777e57f..40678b0c528 100644 --- a/src/meta/processors/job/BalancePlan.cpp +++ b/src/meta/processors/job/BalancePlan.cpp @@ -234,7 +234,6 @@ ErrorOr> BalancePlan::getBalan if (task.ret_ == BalanceTaskResult::FAILED || task.ret_ == BalanceTaskResult::INVALID) { task.ret_ = BalanceTaskResult::IN_PROGRESS; } - task.status_ = BalanceTaskStatus::START; auto activeHostRet = ActiveHostsMan::isLived(kv, task.dst_); if (!nebula::ok(activeHostRet)) { auto retCode = nebula::error(activeHostRet); diff --git a/src/meta/processors/job/BalancePlan.h b/src/meta/processors/job/BalancePlan.h index 8aed704c9a9..902633ef161 100644 --- a/src/meta/processors/job/BalancePlan.h +++ b/src/meta/processors/job/BalancePlan.h @@ -95,6 +95,11 @@ class BalancePlan { void setFinishCallBack(std::function func); + template + void insertTask(InputIterator first, InputIterator last) { + tasks_.insert(tasks_.end(), first, last); + } + private: JobDescription jobDescription_; kvstore::KVStore* kv_ = nullptr; diff --git a/src/meta/processors/job/BalanceTask.h b/src/meta/processors/job/BalanceTask.h index b9fbc36acfd..a07d4869c2a 100644 --- a/src/meta/processors/job/BalanceTask.h +++ b/src/meta/processors/job/BalanceTask.h @@ -30,6 +30,7 @@ class BalanceTask { FRIEND_TEST(BalanceTest, TryToRecoveryTest); FRIEND_TEST(BalanceTest, RecoveryTest); FRIEND_TEST(BalanceTest, StopPlanTest); + FRIEND_TEST(BalanceTest, BalanceZonePlanComplexTest); public: BalanceTask() = default; @@ -68,6 +69,26 @@ class BalanceTask { return ret_; } + const HostAddr& getSrcHost() const { + return src_; + } + + const HostAddr& getDstHost() const { + return dst_; + } + + void setSrcHost(const HostAddr& host) { + src_ = host; + } + + void setDstHost(const HostAddr& host) { + dst_ = host; + } + + PartitionID getPartId() const { + return partId_; + } + private: std::string buildTaskId() { return folly::stringPrintf("%d, %d:%d", jobId_, spaceId_, partId_); diff --git a/src/meta/processors/job/DataBalanceJobExecutor.cpp b/src/meta/processors/job/DataBalanceJobExecutor.cpp index e2abaca40fc..e5666f94202 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.cpp +++ b/src/meta/processors/job/DataBalanceJobExecutor.cpp @@ -63,23 +63,26 @@ Status DataBalanceJobExecutor::buildBalancePlan() { return l->parts_.size() < r->parts_.size(); }); } - plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_)); + std::map> existTasks; // move parts of lost hosts to active hosts in the same zone for (auto& zoneHostEntry : lostZoneHost) { const std::string& zoneName = zoneHostEntry.first; std::vector& lostHostVec = zoneHostEntry.second; std::vector& activeVec = activeSortedHost[zoneName]; + if (activeVec.size() == 0) { + return Status::Error("zone %s has no host", zoneName.c_str()); + } for (Host* host : lostHostVec) { for (PartitionID partId : host->parts_) { Host* dstHost = activeVec.front(); dstHost->parts_.insert(partId); - plan_->addTask(BalanceTask(jobId_, - spaceInfo_.spaceId_, - partId, - host->host_, - dstHost->host_, - kvstore_, - adminClient_)); + existTasks[partId].emplace_back(jobId_, + spaceInfo_.spaceId_, + partId, + host->host_, + dstHost->host_, + kvstore_, + adminClient_); for (size_t i = 0; i < activeVec.size() - 1; i++) { if (activeVec[i]->parts_.size() > activeVec[i + 1]->parts_.size()) { std::swap(activeVec[i], activeVec[i + 1]); @@ -93,7 +96,7 @@ Status DataBalanceJobExecutor::buildBalancePlan() { } lostZoneHost.clear(); // rebalance for hosts in a zone - auto balanceHostVec = [this](std::vector& hostVec) -> std::vector { + auto balanceHostVec = [this, &existTasks](std::vector& hostVec) { size_t totalPartNum = 0; size_t avgPartNum = 0; for (Host* h : hostVec) { @@ -101,7 +104,7 @@ Status DataBalanceJobExecutor::buildBalancePlan() { } if (hostVec.empty()) { LOG(ERROR) << "rebalance error: zone has no host"; - return {}; + return; } avgPartNum = totalPartNum / hostVec.size(); size_t remainder = totalPartNum - avgPartNum * hostVec.size(); @@ -109,7 +112,6 @@ Status DataBalanceJobExecutor::buildBalancePlan() { size_t leftEnd = 0; size_t rightBegin = 0; size_t rightEnd = hostVec.size(); - std::vector tasks; for (size_t i = 0; i < hostVec.size(); i++) { if (avgPartNum <= hostVec[i]->parts_.size()) { leftEnd = i; @@ -136,13 +138,14 @@ Status DataBalanceJobExecutor::buildBalancePlan() { PartitionID partId = *(srcHost->parts_.begin()); hostVec[leftBegin]->parts_.insert(partId); srcHost->parts_.erase(partId); - tasks.emplace_back(jobId_, - spaceInfo_.spaceId_, - partId, - srcHost->host_, - hostVec[leftBegin]->host_, - kvstore_, - adminClient_); + insertOneTask(BalanceTask(jobId_, + spaceInfo_.spaceId_, + partId, + srcHost->host_, + hostVec[leftBegin]->host_, + kvstore_, + adminClient_), + &existTasks); size_t leftIndex = leftBegin; for (; leftIndex < leftEnd - 1; leftIndex++) { if (hostVec[leftIndex]->parts_.size() > hostVec[leftIndex + 1]->parts_.size()) { @@ -158,18 +161,25 @@ Status DataBalanceJobExecutor::buildBalancePlan() { leftEnd = rightBegin; } } - return tasks; }; for (auto& pair : activeSortedHost) { std::vector& hvec = pair.second; - std::vector tasks = balanceHostVec(hvec); - for (BalanceTask& task : tasks) { - plan_->addTask(std::move(task)); - } + balanceHostVec(hvec); } - if (plan_->tasks().empty()) { + bool emty = std::find_if(existTasks.begin(), + existTasks.end(), + [](std::pair>& p) { + return !p.second.empty(); + }) == existTasks.end(); + if (emty) { return Status::Balanced(); } + plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_)); + std::for_each(existTasks.begin(), + existTasks.end(), + [this](std::pair>& p) { + plan_->insertTask(p.second.begin(), p.second.end()); + }); nebula::cpp2::ErrorCode rc = plan_->saveInStore(); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Error("save balance zone plan failed"); diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 795ade3c73c..33753d71575 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -144,7 +144,7 @@ void JobManager::scheduleThread() { // @return: true if all task dispatched, else false bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { - std::lock_guard lk(muJobFinished_); + std::lock_guard lk(muJobFinished_); std::unique_ptr je = JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_); JobExecutor* jobExec = je.get(); @@ -174,7 +174,7 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { if (jobExec->isMetaJob()) { jobExec->setFinishCallBack([this, jobDesc](meta::cpp2::JobStatus status) { if (status == meta::cpp2::JobStatus::STOPPED) { - std::lock_guard lkg(muJobFinished_); + std::lock_guard lkg(muJobFinished_); cleanJob(jobDesc.getJobId()); return nebula::cpp2::ErrorCode::SUCCEEDED; } else { @@ -206,7 +206,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(JobID jobId, cpp2::JobStatus job LOG(INFO) << folly::sformat( "{}, jobId={}, result={}", __func__, jobId, apache::thrift::util::enumNameSafe(jobStatus)); // normal job finish may race to job stop - std::lock_guard lk(muJobFinished_); + std::lock_guard lk(muJobFinished_); auto optJobDescRet = JobDescription::loadJobDescription(jobId, kvStore_); if (!nebula::ok(optJobDescRet)) { LOG(WARNING) << folly::sformat("can't load job, jobId={}", jobId); diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 52e73584058..f8532b61dfa 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -158,7 +158,9 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab AdminClient* adminClient_{nullptr}; std::mutex muReportFinish_; - std::mutex muJobFinished_; + // The reason of using recursive_mutex is that, it's possible for a meta job try to get this lock + // in finish-callback in the same thread with runJobInternal + std::recursive_mutex muJobFinished_; std::atomic status_ = JbmgrStatus::NOT_START; }; diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp index a152bd02d75..14a514e2dd6 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp @@ -113,7 +113,7 @@ HostAddr ZoneBalanceJobExecutor::insertPartIntoZone( nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones( std::vector* sortedActiveZones, std::map>* sortedZoneHosts, - std::vector* tasks) { + std::map>* existTasks) { std::vector& sortedActiveZonesRef = *sortedActiveZones; std::map>& sortedZoneHostsRef = *sortedZoneHosts; int32_t totalPartNum = 0; @@ -147,8 +147,9 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones( for (int32_t leftIndex = leftBegin; leftIndex < leftEnd; leftIndex++) { if (!sortedActiveZonesRef[leftIndex]->partExist(partId)) { HostAddr dst = insertPartIntoZone(sortedZoneHosts, sortedActiveZonesRef[leftIndex], partId); - tasks->emplace_back( - jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_); + insertOneTask( + BalanceTask(jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_), + existTasks); int32_t newLeftIndex = leftIndex; for (; newLeftIndex < leftEnd - 1; newLeftIndex++) { if (sortedActiveZonesRef[newLeftIndex]->partNum_ > @@ -242,7 +243,6 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { if (activeSize < spaceInfo_.replica_) { return Status::Error("Not enough alive zones to hold replica"); } - std::vector tasks; std::vector sortedActiveZones; sortedActiveZones.reserve(activeZones.size()); @@ -285,6 +285,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { return ha; }; + std::map> existTasks; // move parts of lost zones to active zones for (auto& zoneMapEntry : lostZones) { Zone* zone = zoneMapEntry.second; @@ -293,7 +294,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { Host& host = hostMapEntry.second; for (PartitionID partId : host.parts_) { HostAddr dst = chooseZoneToInsert(partId); - tasks.emplace_back( + existTasks[partId].emplace_back( jobId_, spaceInfo_.spaceId_, partId, hostAddr, dst, kvstore_, adminClient_); } host.parts_.clear(); @@ -302,15 +303,23 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { } // all parts of lost zones have moved to active zones, then rebalance the active zones - nebula::cpp2::ErrorCode rc = rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &tasks); + nebula::cpp2::ErrorCode rc = + rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &existTasks); - if (tasks.empty() || rc != nebula::cpp2::ErrorCode::SUCCEEDED) { + bool emty = std::find_if(existTasks.begin(), + existTasks.end(), + [](std::pair>& p) { + return !p.second.empty(); + }) == existTasks.end(); + if (emty || rc != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Balanced(); } plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_)); - for (BalanceTask& task : tasks) { - plan_->addTask(std::move(task)); - } + std::for_each(existTasks.begin(), + existTasks.end(), + [this](std::pair>& p) { + plan_->insertTask(p.second.begin(), p.second.end()); + }); rc = plan_->saveInStore(); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Error("save balance zone plan failed"); diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.h b/src/meta/processors/job/ZoneBalanceJobExecutor.h index 798675191b5..e264e7b822a 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.h +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.h @@ -17,6 +17,8 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor { FRIEND_TEST(BalanceTest, BalanceZoneRemainderPlanTest); FRIEND_TEST(BalanceTest, NormalZoneTest); FRIEND_TEST(BalanceTest, StopPlanTest); + FRIEND_TEST(BalanceTest, BalanceZonePlanComplexTest); + FRIEND_TEST(BalanceTest, NormalZoneComplexTest); public: ZoneBalanceJobExecutor(JobDescription jobDescription, @@ -25,6 +27,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor { const std::vector& params) : BalanceJobExecutor(jobDescription.getJobId(), kvstore, adminClient, params), jobDescription_(jobDescription) {} + nebula::cpp2::ErrorCode prepare() override; nebula::cpp2::ErrorCode stop() override; @@ -38,7 +41,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor { nebula::cpp2::ErrorCode rebalanceActiveZones( std::vector* sortedActiveZones, std::map>* sortedZoneHosts, - std::vector* tasks); + std::map>* existTasks); private: std::vector lostZones_; diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 340e8f3260e..0221a639543 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -246,6 +246,45 @@ TEST(BalanceTest, BalanceZonePlanTest) { EXPECT_EQ(balancer.spaceInfo_.zones_["zone5"].partNum_, 9); } +TEST(BalanceTest, BalanceZonePlanComplexTest) { + fs::TempDir rootPath("/tmp/BalanceZoneTest.XXXXXX"); + std::unique_ptr store = MockCluster::initMetaKV(rootPath.path()); + std::pair>>> pair1( + "z1", {{{"127.0.0.1", 11}, {}}}); + std::pair>>> pair2( + "z2", {{{"127.0.0.1", 12}, {}}}); + std::pair>>> pair3( + "z3", {{{"127.0.0.1", 13}, {}}}); + std::pair>>> pair4( + "z4", {{{"127.0.0.1", 14}, {}}}); + std::pair>>> pair5( + "z5", {{{"127.0.0.1", 15}, {}}}); + for (int32_t i = 1; i <= 512; i++) { + pair1.second.front().second.push_back(i); + pair2.second.front().second.push_back(i); + pair3.second.front().second.push_back(i); + } + SpaceInfo spaceInfo = createSpaceInfo("space1", 1, 3, {pair1, pair2, pair3, pair4, pair5}); + ZoneBalanceJobExecutor balancer(JobDescription(), store.get(), nullptr, {}); + balancer.spaceInfo_ = spaceInfo; + Status status = balancer.buildBalancePlan(); + EXPECT_EQ(status, Status::OK()); + EXPECT_EQ(balancer.spaceInfo_.zones_["z1"].partNum_, 308); + EXPECT_EQ(balancer.spaceInfo_.zones_["z2"].partNum_, 307); + EXPECT_EQ(balancer.spaceInfo_.zones_["z3"].partNum_, 307); + EXPECT_EQ(balancer.spaceInfo_.zones_["z4"].partNum_, 307); + EXPECT_EQ(balancer.spaceInfo_.zones_["z5"].partNum_, 307); + balancer.lostZones_ = {"z1"}; + status = balancer.buildBalancePlan(); + EXPECT_EQ(status, Status::OK()); + EXPECT_EQ(balancer.plan_->tasks().size(), 389); + auto tasks = balancer.plan_->tasks(); + EXPECT_EQ(balancer.spaceInfo_.zones_["z2"].partNum_, 384); + EXPECT_EQ(balancer.spaceInfo_.zones_["z3"].partNum_, 384); + EXPECT_EQ(balancer.spaceInfo_.zones_["z4"].partNum_, 384); + EXPECT_EQ(balancer.spaceInfo_.zones_["z5"].partNum_, 384); +} + TEST(BalanceTest, BalanceZoneRemainderPlanTest) { fs::TempDir rootPath("/tmp/BalanceZoneTest.XXXXXX"); std::unique_ptr store = MockCluster::initMetaKV(rootPath.path()); @@ -668,6 +707,34 @@ void verifyMetaZone(kvstore::KVStore* kv, EXPECT_EQ(zoneSet, expectZones); } +void verifyZonePartNum(kvstore::KVStore* kv, + GraphSpaceID spaceId, + const std::map& zones) { + std::map hostZone; + std::map zoneNum; + std::unique_ptr iter; + for (const auto& [zone, num] : zones) { + std::string zoneKey = MetaKeyUtils::zoneKey(zone); + std::string value; + kv->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &value); + auto hosts = MetaKeyUtils::parseZoneHosts(value); + for (auto& host : hosts) { + hostZone[host] = zone; + } + zoneNum[zone] = 0; + } + const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId); + kv->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter); + while (iter->valid()) { + auto hosts = MetaKeyUtils::parsePartVal(iter->val()); + for (auto& host : hosts) { + zoneNum[hostZone[host]]++; + } + iter->next(); + } + EXPECT_EQ(zoneNum, zones); +} + JobDescription makeJobDescription(kvstore::KVStore* kv, cpp2::AdminCmd cmd) { JobDescription jd(testJobId.fetch_add(1, std::memory_order_relaxed), cmd, {}); std::vector data; @@ -779,8 +846,12 @@ TEST(BalanceTest, RecoveryTest) { partCount, 6); balancer.recovery(); - verifyBalanceTask( - kv, balancer.jobId_, BalanceTaskStatus::START, BalanceTaskResult::IN_PROGRESS, partCount, 6); + verifyBalanceTask(kv, + balancer.jobId_, + BalanceTaskStatus::CATCH_UP_DATA, + BalanceTaskResult::IN_PROGRESS, + partCount, + 6); baton.reset(); balancer.setFinishCallBack([&](meta::cpp2::JobStatus) { baton.post(); diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 903840e34eb..8e08afdfd89 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -218,6 +218,33 @@ class TestUtils { baton.wait(); } + static void addZoneToSpace(kvstore::KVStore* kv, + GraphSpaceID id, + const std::vector& zones) { + std::string spaceKey = MetaKeyUtils::spaceKey(id); + std::string spaceVal; + kv->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &spaceVal); + meta::cpp2::SpaceDesc properties = MetaKeyUtils::parseSpace(spaceVal); + std::vector curZones = properties.get_zone_names(); + curZones.insert(curZones.end(), zones.begin(), zones.end()); + properties.zone_names_ref() = curZones; + std::vector data; + data.emplace_back(MetaKeyUtils::spaceKey(id), MetaKeyUtils::spaceVal(properties)); + folly::Baton baton; + auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; + kv->asyncMultiPut(kDefaultSpaceId, + kDefaultPartId, + std::move(data), + [&ret, &baton](nebula::cpp2::ErrorCode code) { + if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { + ret = code; + LOG(INFO) << "Put data error on meta server"; + } + baton.post(); + }); + baton.wait(); + } + static void assembleSpaceWithZone(kvstore::KVStore* kv, GraphSpaceID id, int32_t partitionNum,