Skip to content

Commit

Permalink
Balance fix (#3668)
Browse files Browse the repository at this point in the history
* fix core balance in zone removing the only host in zone

* fix balance plan that make redundant part

* fix deadlock when runInternal and finish callback called in the same thread

* fix transleader return TERM_OUT_OF_DATE

* continue running balance task from last status

* fix deadlock when removing part

Co-authored-by: Sophie <[email protected]>
  • Loading branch information
liwenhui-soul and Sophie-Xie committed Jan 22, 2022
1 parent 1ef3b18 commit 461794c
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 46 deletions.
6 changes: 5 additions & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,15 @@ nebula::cpp2::ErrorCode RaftPart::canAppendLogs() {

nebula::cpp2::ErrorCode RaftPart::canAppendLogs(TermID termId) {
DCHECK(!raftLock_.try_lock());
nebula::cpp2::ErrorCode rc = canAppendLogs();
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
}
if (UNLIKELY(term_ != termId)) {
VLOG(2) << idStr_ << "Term has been updated, origin " << termId << ", new " << term_;
return nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE;
}
return canAppendLogs();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void RaftPart::addLearner(const HostAddr& addr) {
Expand Down
16 changes: 13 additions & 3 deletions src/kvstore/raftex/RaftexService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,20 @@ void RaftexService::addPartition(std::shared_ptr<RaftPart> part) {
}

void RaftexService::removePartition(std::shared_ptr<RaftPart> part) {
folly::RWSpinLock::WriteHolder wh(partsLock_);
parts_.erase(std::make_pair(part->spaceId(), part->partitionId()));
using FType = decltype(folly::makeFuture());
using FTValype = typename FType::value_type;
// Stop the partition
part->stop();
folly::makeFuture()
.thenValue([this, &part](FTValype) {
folly::RWSpinLock::WriteHolder wh(partsLock_);
parts_.erase(std::make_pair(part->spaceId(), part->partitionId()));
})
// the part->stop() would wait for requestOnGoing_ in Host, and the requestOnGoing_ will
// release in response in ioThreadPool,this may cause deadlock, so doing it in another
// threadpool to avoid this condition
.via(folly::getGlobalCPUExecutor())
.thenValue([part](FTValype) { part->stop(); })
.wait();
}

std::shared_ptr<RaftPart> RaftexService::findPart(GraphSpaceID spaceId, PartitionID partId) {
Expand Down
30 changes: 30 additions & 0 deletions src/meta/processors/job/BalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,36 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::save(const std::string& k, const std
return rc;
}

void BalanceJobExecutor::insertOneTask(
const BalanceTask& task, std::map<PartitionID, std::vector<BalanceTask>>* existTasks) {
std::vector<BalanceTask>& taskVec = existTasks->operator[](task.getPartId());
if (taskVec.empty()) {
taskVec.emplace_back(task);
} else {
for (auto it = taskVec.begin(); it != taskVec.end(); it++) {
if (task.getDstHost() == it->getSrcHost() && task.getSrcHost() == it->getDstHost()) {
taskVec.erase(it);
return;
} else if (task.getDstHost() == it->getSrcHost()) {
BalanceTask newTask(task);
newTask.setDstHost(it->getDstHost());
taskVec.erase(it);
insertOneTask(newTask, existTasks);
return;
} else if (task.getSrcHost() == it->getDstHost()) {
BalanceTask newTask(task);
newTask.setSrcHost(it->getSrcHost());
taskVec.erase(it);
insertOneTask(newTask, existTasks);
return;
} else {
continue;
}
}
taskVec.emplace_back(task);
}
}

nebula::cpp2::ErrorCode SpaceInfo::loadInfo(GraphSpaceID spaceId, kvstore::KVStore* kvstore) {
spaceId_ = spaceId;
std::string spaceKey = MetaKeyUtils::spaceKey(spaceId);
Expand Down
3 changes: 3 additions & 0 deletions src/meta/processors/job/BalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class BalanceJobExecutor : public MetaJobExecutor {
return Status::OK();
}

void insertOneTask(const BalanceTask& task,
std::map<PartitionID, std::vector<BalanceTask>>* existTasks);

protected:
std::unique_ptr<BalancePlan> plan_;
std::unique_ptr<folly::Executor> executor_;
Expand Down
1 change: 0 additions & 1 deletion src/meta/processors/job/BalancePlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<BalanceTask>> BalancePlan::getBalan
if (task.ret_ == BalanceTaskResult::FAILED || task.ret_ == BalanceTaskResult::INVALID) {
task.ret_ = BalanceTaskResult::IN_PROGRESS;
}
task.status_ = BalanceTaskStatus::START;
auto activeHostRet = ActiveHostsMan::isLived(kv, task.dst_);
if (!nebula::ok(activeHostRet)) {
auto retCode = nebula::error(activeHostRet);
Expand Down
5 changes: 5 additions & 0 deletions src/meta/processors/job/BalancePlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class BalancePlan {

void setFinishCallBack(std::function<void(meta::cpp2::JobStatus)> func);

template <typename InputIterator>
void insertTask(InputIterator first, InputIterator last) {
tasks_.insert(tasks_.end(), first, last);
}

private:
JobDescription jobDescription_;
kvstore::KVStore* kv_ = nullptr;
Expand Down
21 changes: 21 additions & 0 deletions src/meta/processors/job/BalanceTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class BalanceTask {
FRIEND_TEST(BalanceTest, TryToRecoveryTest);
FRIEND_TEST(BalanceTest, RecoveryTest);
FRIEND_TEST(BalanceTest, StopPlanTest);
FRIEND_TEST(BalanceTest, BalanceZonePlanComplexTest);

public:
BalanceTask() = default;
Expand Down Expand Up @@ -68,6 +69,26 @@ class BalanceTask {
return ret_;
}

const HostAddr& getSrcHost() const {
return src_;
}

const HostAddr& getDstHost() const {
return dst_;
}

void setSrcHost(const HostAddr& host) {
src_ = host;
}

void setDstHost(const HostAddr& host) {
dst_ = host;
}

PartitionID getPartId() const {
return partId_;
}

private:
std::string buildTaskId() {
return folly::stringPrintf("%d, %d:%d", jobId_, spaceId_, partId_);
Expand Down
58 changes: 34 additions & 24 deletions src/meta/processors/job/DataBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,26 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
return l->parts_.size() < r->parts_.size();
});
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
std::map<PartitionID, std::vector<BalanceTask>> existTasks;
// move parts of lost hosts to active hosts in the same zone
for (auto& zoneHostEntry : lostZoneHost) {
const std::string& zoneName = zoneHostEntry.first;
std::vector<Host*>& lostHostVec = zoneHostEntry.second;
std::vector<Host*>& activeVec = activeSortedHost[zoneName];
if (activeVec.size() == 0) {
return Status::Error("zone %s has no host", zoneName.c_str());
}
for (Host* host : lostHostVec) {
for (PartitionID partId : host->parts_) {
Host* dstHost = activeVec.front();
dstHost->parts_.insert(partId);
plan_->addTask(BalanceTask(jobId_,
spaceInfo_.spaceId_,
partId,
host->host_,
dstHost->host_,
kvstore_,
adminClient_));
existTasks[partId].emplace_back(jobId_,
spaceInfo_.spaceId_,
partId,
host->host_,
dstHost->host_,
kvstore_,
adminClient_);
for (size_t i = 0; i < activeVec.size() - 1; i++) {
if (activeVec[i]->parts_.size() > activeVec[i + 1]->parts_.size()) {
std::swap(activeVec[i], activeVec[i + 1]);
Expand All @@ -93,23 +96,22 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
}
lostZoneHost.clear();
// rebalance for hosts in a zone
auto balanceHostVec = [this](std::vector<Host*>& hostVec) -> std::vector<BalanceTask> {
auto balanceHostVec = [this, &existTasks](std::vector<Host*>& hostVec) {
size_t totalPartNum = 0;
size_t avgPartNum = 0;
for (Host* h : hostVec) {
totalPartNum += h->parts_.size();
}
if (hostVec.empty()) {
LOG(ERROR) << "rebalance error: zone has no host";
return {};
return;
}
avgPartNum = totalPartNum / hostVec.size();
size_t remainder = totalPartNum - avgPartNum * hostVec.size();
size_t leftBegin = 0;
size_t leftEnd = 0;
size_t rightBegin = 0;
size_t rightEnd = hostVec.size();
std::vector<BalanceTask> tasks;
for (size_t i = 0; i < hostVec.size(); i++) {
if (avgPartNum <= hostVec[i]->parts_.size()) {
leftEnd = i;
Expand All @@ -136,13 +138,14 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
PartitionID partId = *(srcHost->parts_.begin());
hostVec[leftBegin]->parts_.insert(partId);
srcHost->parts_.erase(partId);
tasks.emplace_back(jobId_,
spaceInfo_.spaceId_,
partId,
srcHost->host_,
hostVec[leftBegin]->host_,
kvstore_,
adminClient_);
insertOneTask(BalanceTask(jobId_,
spaceInfo_.spaceId_,
partId,
srcHost->host_,
hostVec[leftBegin]->host_,
kvstore_,
adminClient_),
&existTasks);
size_t leftIndex = leftBegin;
for (; leftIndex < leftEnd - 1; leftIndex++) {
if (hostVec[leftIndex]->parts_.size() > hostVec[leftIndex + 1]->parts_.size()) {
Expand All @@ -158,18 +161,25 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
leftEnd = rightBegin;
}
}
return tasks;
};
for (auto& pair : activeSortedHost) {
std::vector<Host*>& hvec = pair.second;
std::vector<BalanceTask> tasks = balanceHostVec(hvec);
for (BalanceTask& task : tasks) {
plan_->addTask(std::move(task));
}
balanceHostVec(hvec);
}
if (plan_->tasks().empty()) {
bool emty = std::find_if(existTasks.begin(),
existTasks.end(),
[](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
return !p.second.empty();
}) == existTasks.end();
if (emty) {
return Status::Balanced();
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
std::for_each(existTasks.begin(),
existTasks.end(),
[this](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
plan_->insertTask(p.second.begin(), p.second.end());
});
nebula::cpp2::ErrorCode rc = plan_->saveInStore();
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("save balance zone plan failed");
Expand Down
6 changes: 3 additions & 3 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void JobManager::scheduleThread() {

// @return: true if all task dispatched, else false
bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
std::lock_guard<std::mutex> lk(muJobFinished_);
std::lock_guard<std::recursive_mutex> lk(muJobFinished_);
std::unique_ptr<JobExecutor> je =
JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_);
JobExecutor* jobExec = je.get();
Expand Down Expand Up @@ -174,7 +174,7 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
if (jobExec->isMetaJob()) {
jobExec->setFinishCallBack([this, jobDesc](meta::cpp2::JobStatus status) {
if (status == meta::cpp2::JobStatus::STOPPED) {
std::lock_guard<std::mutex> lkg(muJobFinished_);
std::lock_guard<std::recursive_mutex> lkg(muJobFinished_);
cleanJob(jobDesc.getJobId());
return nebula::cpp2::ErrorCode::SUCCEEDED;
} else {
Expand Down Expand Up @@ -206,7 +206,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(JobID jobId, cpp2::JobStatus job
LOG(INFO) << folly::sformat(
"{}, jobId={}, result={}", __func__, jobId, apache::thrift::util::enumNameSafe(jobStatus));
// normal job finish may race to job stop
std::lock_guard<std::mutex> lk(muJobFinished_);
std::lock_guard<std::recursive_mutex> lk(muJobFinished_);
auto optJobDescRet = JobDescription::loadJobDescription(jobId, kvStore_);
if (!nebula::ok(optJobDescRet)) {
LOG(WARNING) << folly::sformat("can't load job, jobId={}", jobId);
Expand Down
4 changes: 3 additions & 1 deletion src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab
AdminClient* adminClient_{nullptr};

std::mutex muReportFinish_;
std::mutex muJobFinished_;
// The reason of using recursive_mutex is that, it's possible for a meta job try to get this lock
// in finish-callback in the same thread with runJobInternal
std::recursive_mutex muJobFinished_;
std::atomic<JbmgrStatus> status_ = JbmgrStatus::NOT_START;
};

Expand Down
29 changes: 19 additions & 10 deletions src/meta/processors/job/ZoneBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ HostAddr ZoneBalanceJobExecutor::insertPartIntoZone(
nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones(
std::vector<Zone*>* sortedActiveZones,
std::map<std::string, std::vector<Host*>>* sortedZoneHosts,
std::vector<BalanceTask>* tasks) {
std::map<PartitionID, std::vector<BalanceTask>>* existTasks) {
std::vector<Zone*>& sortedActiveZonesRef = *sortedActiveZones;
std::map<std::string, std::vector<Host*>>& sortedZoneHostsRef = *sortedZoneHosts;
int32_t totalPartNum = 0;
Expand Down Expand Up @@ -147,8 +147,9 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones(
for (int32_t leftIndex = leftBegin; leftIndex < leftEnd; leftIndex++) {
if (!sortedActiveZonesRef[leftIndex]->partExist(partId)) {
HostAddr dst = insertPartIntoZone(sortedZoneHosts, sortedActiveZonesRef[leftIndex], partId);
tasks->emplace_back(
jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_);
insertOneTask(
BalanceTask(jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_),
existTasks);
int32_t newLeftIndex = leftIndex;
for (; newLeftIndex < leftEnd - 1; newLeftIndex++) {
if (sortedActiveZonesRef[newLeftIndex]->partNum_ >
Expand Down Expand Up @@ -242,7 +243,6 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
if (activeSize < spaceInfo_.replica_) {
return Status::Error("Not enough alive zones to hold replica");
}
std::vector<BalanceTask> tasks;

std::vector<Zone*> sortedActiveZones;
sortedActiveZones.reserve(activeZones.size());
Expand Down Expand Up @@ -285,6 +285,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
return ha;
};

std::map<PartitionID, std::vector<BalanceTask>> existTasks;
// move parts of lost zones to active zones
for (auto& zoneMapEntry : lostZones) {
Zone* zone = zoneMapEntry.second;
Expand All @@ -293,7 +294,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
Host& host = hostMapEntry.second;
for (PartitionID partId : host.parts_) {
HostAddr dst = chooseZoneToInsert(partId);
tasks.emplace_back(
existTasks[partId].emplace_back(
jobId_, spaceInfo_.spaceId_, partId, hostAddr, dst, kvstore_, adminClient_);
}
host.parts_.clear();
Expand All @@ -302,15 +303,23 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
}

// all parts of lost zones have moved to active zones, then rebalance the active zones
nebula::cpp2::ErrorCode rc = rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &tasks);
nebula::cpp2::ErrorCode rc =
rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &existTasks);

if (tasks.empty() || rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
bool emty = std::find_if(existTasks.begin(),
existTasks.end(),
[](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
return !p.second.empty();
}) == existTasks.end();
if (emty || rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Balanced();
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
for (BalanceTask& task : tasks) {
plan_->addTask(std::move(task));
}
std::for_each(existTasks.begin(),
existTasks.end(),
[this](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
plan_->insertTask(p.second.begin(), p.second.end());
});
rc = plan_->saveInStore();
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("save balance zone plan failed");
Expand Down
5 changes: 4 additions & 1 deletion src/meta/processors/job/ZoneBalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
FRIEND_TEST(BalanceTest, BalanceZoneRemainderPlanTest);
FRIEND_TEST(BalanceTest, NormalZoneTest);
FRIEND_TEST(BalanceTest, StopPlanTest);
FRIEND_TEST(BalanceTest, BalanceZonePlanComplexTest);
FRIEND_TEST(BalanceTest, NormalZoneComplexTest);

public:
ZoneBalanceJobExecutor(JobDescription jobDescription,
Expand All @@ -25,6 +27,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
const std::vector<std::string>& params)
: BalanceJobExecutor(jobDescription.getJobId(), kvstore, adminClient, params),
jobDescription_(jobDescription) {}

nebula::cpp2::ErrorCode prepare() override;
nebula::cpp2::ErrorCode stop() override;

Expand All @@ -38,7 +41,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
nebula::cpp2::ErrorCode rebalanceActiveZones(
std::vector<Zone*>* sortedActiveZones,
std::map<std::string, std::vector<Host*>>* sortedZoneHosts,
std::vector<BalanceTask>* tasks);
std::map<PartitionID, std::vector<BalanceTask>>* existTasks);

private:
std::vector<std::string> lostZones_;
Expand Down
Loading

0 comments on commit 461794c

Please sign in to comment.