Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Balance fix #3668

Merged
merged 7 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,15 @@ nebula::cpp2::ErrorCode RaftPart::canAppendLogs() {

nebula::cpp2::ErrorCode RaftPart::canAppendLogs(TermID termId) {
DCHECK(!raftLock_.try_lock());
nebula::cpp2::ErrorCode rc = canAppendLogs();
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return rc;
}
if (UNLIKELY(term_ != termId)) {
VLOG(2) << idStr_ << "Term has been updated, origin " << termId << ", new " << term_;
return nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE;
}
return canAppendLogs();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void RaftPart::addLearner(const HostAddr& addr) {
Expand Down
16 changes: 13 additions & 3 deletions src/kvstore/raftex/RaftexService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,20 @@ void RaftexService::addPartition(std::shared_ptr<RaftPart> part) {
}

void RaftexService::removePartition(std::shared_ptr<RaftPart> part) {
folly::RWSpinLock::WriteHolder wh(partsLock_);
parts_.erase(std::make_pair(part->spaceId(), part->partitionId()));
using FType = decltype(folly::makeFuture());
using FTValype = typename FType::value_type;
// Stop the partition
part->stop();
folly::makeFuture()
.thenValue([this, &part](FTValype) {
folly::RWSpinLock::WriteHolder wh(partsLock_);
parts_.erase(std::make_pair(part->spaceId(), part->partitionId()));
})
// the part->stop() would wait for requestOnGoing_ in Host, and the requestOnGoing_ will
// release in response in ioThreadPool,this may cause deadlock, so doing it in another
// threadpool to avoid this condition
.via(folly::getGlobalCPUExecutor())
.thenValue([part](FTValype) { part->stop(); })
.wait();
}

std::shared_ptr<RaftPart> RaftexService::findPart(GraphSpaceID spaceId, PartitionID partId) {
Expand Down
30 changes: 30 additions & 0 deletions src/meta/processors/job/BalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,36 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::save(const std::string& k, const std
return rc;
}

void BalanceJobExecutor::insertOneTask(
const BalanceTask& task, std::map<PartitionID, std::vector<BalanceTask>>* existTasks) {
std::vector<BalanceTask>& taskVec = existTasks->operator[](task.getPartId());
kikimo marked this conversation as resolved.
Show resolved Hide resolved
if (taskVec.empty()) {
taskVec.emplace_back(task);
} else {
for (auto it = taskVec.begin(); it != taskVec.end(); it++) {
if (task.getDstHost() == it->getSrcHost() && task.getSrcHost() == it->getDstHost()) {
taskVec.erase(it);
return;
} else if (task.getDstHost() == it->getSrcHost()) {
BalanceTask newTask(task);
newTask.setDstHost(it->getDstHost());
taskVec.erase(it);
insertOneTask(newTask, existTasks);
return;
} else if (task.getSrcHost() == it->getDstHost()) {
BalanceTask newTask(task);
newTask.setSrcHost(it->getSrcHost());
taskVec.erase(it);
insertOneTask(newTask, existTasks);
return;
} else {
continue;
}
}
taskVec.emplace_back(task);
}
}

nebula::cpp2::ErrorCode SpaceInfo::loadInfo(GraphSpaceID spaceId, kvstore::KVStore* kvstore) {
spaceId_ = spaceId;
std::string spaceKey = MetaKeyUtils::spaceKey(spaceId);
Expand Down
3 changes: 3 additions & 0 deletions src/meta/processors/job/BalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class BalanceJobExecutor : public MetaJobExecutor {
return Status::OK();
}

void insertOneTask(const BalanceTask& task,
std::map<PartitionID, std::vector<BalanceTask>>* existTasks);

protected:
std::unique_ptr<BalancePlan> plan_;
std::unique_ptr<folly::Executor> executor_;
Expand Down
1 change: 0 additions & 1 deletion src/meta/processors/job/BalancePlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<BalanceTask>> BalancePlan::getBalan
if (task.ret_ == BalanceTaskResult::FAILED || task.ret_ == BalanceTaskResult::INVALID) {
task.ret_ = BalanceTaskResult::IN_PROGRESS;
}
task.status_ = BalanceTaskStatus::START;
auto activeHostRet = ActiveHostsMan::isLived(kv, task.dst_);
if (!nebula::ok(activeHostRet)) {
auto retCode = nebula::error(activeHostRet);
Expand Down
5 changes: 5 additions & 0 deletions src/meta/processors/job/BalancePlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class BalancePlan {

void setFinishCallBack(std::function<void(meta::cpp2::JobStatus)> func);

template <typename InputIterator>
void insertTask(InputIterator first, InputIterator last) {
tasks_.insert(tasks_.end(), first, last);
}

private:
JobDescription jobDescription_;
kvstore::KVStore* kv_ = nullptr;
Expand Down
21 changes: 21 additions & 0 deletions src/meta/processors/job/BalanceTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class BalanceTask {
FRIEND_TEST(BalanceTest, TryToRecoveryTest);
FRIEND_TEST(BalanceTest, RecoveryTest);
FRIEND_TEST(BalanceTest, StopPlanTest);
FRIEND_TEST(BalanceTest, BalanceZonePlanComplexTest);

public:
BalanceTask() = default;
Expand Down Expand Up @@ -68,6 +69,26 @@ class BalanceTask {
return ret_;
}

const HostAddr& getSrcHost() const {
return src_;
}

const HostAddr& getDstHost() const {
return dst_;
}

void setSrcHost(const HostAddr& host) {
src_ = host;
}

void setDstHost(const HostAddr& host) {
dst_ = host;
}

PartitionID getPartId() const {
return partId_;
}

private:
std::string buildTaskId() {
return folly::stringPrintf("%d, %d:%d", jobId_, spaceId_, partId_);
Expand Down
58 changes: 34 additions & 24 deletions src/meta/processors/job/DataBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,26 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
return l->parts_.size() < r->parts_.size();
});
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
std::map<PartitionID, std::vector<BalanceTask>> existTasks;
// move parts of lost hosts to active hosts in the same zone
for (auto& zoneHostEntry : lostZoneHost) {
const std::string& zoneName = zoneHostEntry.first;
std::vector<Host*>& lostHostVec = zoneHostEntry.second;
std::vector<Host*>& activeVec = activeSortedHost[zoneName];
if (activeVec.size() == 0) {
return Status::Error("zone %s has no host", zoneName.c_str());
}
for (Host* host : lostHostVec) {
for (PartitionID partId : host->parts_) {
Host* dstHost = activeVec.front();
dstHost->parts_.insert(partId);
plan_->addTask(BalanceTask(jobId_,
spaceInfo_.spaceId_,
partId,
host->host_,
dstHost->host_,
kvstore_,
adminClient_));
existTasks[partId].emplace_back(jobId_,
spaceInfo_.spaceId_,
partId,
host->host_,
dstHost->host_,
kvstore_,
adminClient_);
for (size_t i = 0; i < activeVec.size() - 1; i++) {
if (activeVec[i]->parts_.size() > activeVec[i + 1]->parts_.size()) {
std::swap(activeVec[i], activeVec[i + 1]);
Expand All @@ -93,23 +96,22 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
}
lostZoneHost.clear();
// rebalance for hosts in a zone
auto balanceHostVec = [this](std::vector<Host*>& hostVec) -> std::vector<BalanceTask> {
auto balanceHostVec = [this, &existTasks](std::vector<Host*>& hostVec) {
size_t totalPartNum = 0;
size_t avgPartNum = 0;
for (Host* h : hostVec) {
totalPartNum += h->parts_.size();
}
if (hostVec.empty()) {
LOG(ERROR) << "rebalance error: zone has no host";
return {};
return;
}
avgPartNum = totalPartNum / hostVec.size();
size_t remainder = totalPartNum - avgPartNum * hostVec.size();
size_t leftBegin = 0;
size_t leftEnd = 0;
size_t rightBegin = 0;
size_t rightEnd = hostVec.size();
std::vector<BalanceTask> tasks;
for (size_t i = 0; i < hostVec.size(); i++) {
if (avgPartNum <= hostVec[i]->parts_.size()) {
leftEnd = i;
Expand All @@ -136,13 +138,14 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
PartitionID partId = *(srcHost->parts_.begin());
hostVec[leftBegin]->parts_.insert(partId);
srcHost->parts_.erase(partId);
tasks.emplace_back(jobId_,
spaceInfo_.spaceId_,
partId,
srcHost->host_,
hostVec[leftBegin]->host_,
kvstore_,
adminClient_);
insertOneTask(BalanceTask(jobId_,
spaceInfo_.spaceId_,
partId,
srcHost->host_,
hostVec[leftBegin]->host_,
kvstore_,
adminClient_),
&existTasks);
size_t leftIndex = leftBegin;
for (; leftIndex < leftEnd - 1; leftIndex++) {
if (hostVec[leftIndex]->parts_.size() > hostVec[leftIndex + 1]->parts_.size()) {
Expand All @@ -158,18 +161,25 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
leftEnd = rightBegin;
}
}
return tasks;
};
for (auto& pair : activeSortedHost) {
std::vector<Host*>& hvec = pair.second;
std::vector<BalanceTask> tasks = balanceHostVec(hvec);
for (BalanceTask& task : tasks) {
plan_->addTask(std::move(task));
}
balanceHostVec(hvec);
}
if (plan_->tasks().empty()) {
bool emty = std::find_if(existTasks.begin(),
existTasks.end(),
[](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
return !p.second.empty();
}) == existTasks.end();
if (emty) {
return Status::Balanced();
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
std::for_each(existTasks.begin(),
existTasks.end(),
[this](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
plan_->insertTask(p.second.begin(), p.second.end());
});
nebula::cpp2::ErrorCode rc = plan_->saveInStore();
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("save balance zone plan failed");
Expand Down
6 changes: 3 additions & 3 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void JobManager::scheduleThread() {

// @return: true if all task dispatched, else false
bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
std::lock_guard<std::mutex> lk(muJobFinished_);
std::lock_guard<std::recursive_mutex> lk(muJobFinished_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to use recursive_mutex here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if all balance tasks success and the job is stopped before job status updated, the balance tasks will get muJobFinished_ through call back in the same thread

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it. Please add some comments. Or ,aybe trigger the callback of BalancePlan in another thread is more clear.

Copy link
Contributor Author

@liwenhui-soul liwenhui-soul Jan 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have added comment at its declared place

std::unique_ptr<JobExecutor> je =
JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_);
JobExecutor* jobExec = je.get();
Expand Down Expand Up @@ -174,7 +174,7 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
if (jobExec->isMetaJob()) {
jobExec->setFinishCallBack([this, jobDesc](meta::cpp2::JobStatus status) {
if (status == meta::cpp2::JobStatus::STOPPED) {
std::lock_guard<std::mutex> lkg(muJobFinished_);
std::lock_guard<std::recursive_mutex> lkg(muJobFinished_);
cleanJob(jobDesc.getJobId());
return nebula::cpp2::ErrorCode::SUCCEEDED;
} else {
Expand Down Expand Up @@ -206,7 +206,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(JobID jobId, cpp2::JobStatus job
LOG(INFO) << folly::sformat(
"{}, jobId={}, result={}", __func__, jobId, apache::thrift::util::enumNameSafe(jobStatus));
// normal job finish may race to job stop
std::lock_guard<std::mutex> lk(muJobFinished_);
std::lock_guard<std::recursive_mutex> lk(muJobFinished_);
auto optJobDescRet = JobDescription::loadJobDescription(jobId, kvStore_);
if (!nebula::ok(optJobDescRet)) {
LOG(WARNING) << folly::sformat("can't load job, jobId={}", jobId);
Expand Down
4 changes: 3 additions & 1 deletion src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab
AdminClient* adminClient_{nullptr};

std::mutex muReportFinish_;
std::mutex muJobFinished_;
// The reason of using recursive_mutex is that, it's possible for a meta job try to get this lock
// in finish-callback in the same thread with runJobInternal
std::recursive_mutex muJobFinished_;
std::atomic<JbmgrStatus> status_ = JbmgrStatus::NOT_START;
};

Expand Down
29 changes: 19 additions & 10 deletions src/meta/processors/job/ZoneBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ HostAddr ZoneBalanceJobExecutor::insertPartIntoZone(
nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones(
std::vector<Zone*>* sortedActiveZones,
std::map<std::string, std::vector<Host*>>* sortedZoneHosts,
std::vector<BalanceTask>* tasks) {
std::map<PartitionID, std::vector<BalanceTask>>* existTasks) {
std::vector<Zone*>& sortedActiveZonesRef = *sortedActiveZones;
std::map<std::string, std::vector<Host*>>& sortedZoneHostsRef = *sortedZoneHosts;
int32_t totalPartNum = 0;
Expand Down Expand Up @@ -147,8 +147,9 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones(
for (int32_t leftIndex = leftBegin; leftIndex < leftEnd; leftIndex++) {
if (!sortedActiveZonesRef[leftIndex]->partExist(partId)) {
HostAddr dst = insertPartIntoZone(sortedZoneHosts, sortedActiveZonesRef[leftIndex], partId);
tasks->emplace_back(
jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_);
insertOneTask(
BalanceTask(jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_),
existTasks);
int32_t newLeftIndex = leftIndex;
for (; newLeftIndex < leftEnd - 1; newLeftIndex++) {
if (sortedActiveZonesRef[newLeftIndex]->partNum_ >
Expand Down Expand Up @@ -242,7 +243,6 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
if (activeSize < spaceInfo_.replica_) {
return Status::Error("Not enough alive zones to hold replica");
}
std::vector<BalanceTask> tasks;

std::vector<Zone*> sortedActiveZones;
sortedActiveZones.reserve(activeZones.size());
Expand Down Expand Up @@ -285,6 +285,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
return ha;
};

std::map<PartitionID, std::vector<BalanceTask>> existTasks;
// move parts of lost zones to active zones
for (auto& zoneMapEntry : lostZones) {
Zone* zone = zoneMapEntry.second;
Expand All @@ -293,7 +294,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
Host& host = hostMapEntry.second;
for (PartitionID partId : host.parts_) {
HostAddr dst = chooseZoneToInsert(partId);
tasks.emplace_back(
existTasks[partId].emplace_back(
jobId_, spaceInfo_.spaceId_, partId, hostAddr, dst, kvstore_, adminClient_);
}
host.parts_.clear();
Expand All @@ -302,15 +303,23 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
}

// all parts of lost zones have moved to active zones, then rebalance the active zones
nebula::cpp2::ErrorCode rc = rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &tasks);
nebula::cpp2::ErrorCode rc =
rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &existTasks);

if (tasks.empty() || rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
bool emty = std::find_if(existTasks.begin(),
existTasks.end(),
[](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
return !p.second.empty();
}) == existTasks.end();
if (emty || rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Balanced();
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
for (BalanceTask& task : tasks) {
plan_->addTask(std::move(task));
}
std::for_each(existTasks.begin(),
existTasks.end(),
[this](std::pair<const PartitionID, std::vector<BalanceTask>>& p) {
plan_->insertTask(p.second.begin(), p.second.end());
});
rc = plan_->saveInStore();
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("save balance zone plan failed");
Expand Down
5 changes: 4 additions & 1 deletion src/meta/processors/job/ZoneBalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
FRIEND_TEST(BalanceTest, BalanceZoneRemainderPlanTest);
FRIEND_TEST(BalanceTest, NormalZoneTest);
FRIEND_TEST(BalanceTest, StopPlanTest);
FRIEND_TEST(BalanceTest, BalanceZonePlanComplexTest);
FRIEND_TEST(BalanceTest, NormalZoneComplexTest);

public:
ZoneBalanceJobExecutor(JobDescription jobDescription,
Expand All @@ -25,6 +27,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
const std::vector<std::string>& params)
: BalanceJobExecutor(jobDescription.getJobId(), kvstore, adminClient, params),
jobDescription_(jobDescription) {}

nebula::cpp2::ErrorCode prepare() override;
nebula::cpp2::ErrorCode stop() override;

Expand All @@ -38,7 +41,7 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
nebula::cpp2::ErrorCode rebalanceActiveZones(
std::vector<Zone*>* sortedActiveZones,
std::map<std::string, std::vector<Host*>>* sortedZoneHosts,
std::vector<BalanceTask>* tasks);
std::map<PartitionID, std::vector<BalanceTask>>* existTasks);

private:
std::vector<std::string> lostZones_;
Expand Down
Loading