Skip to content

Commit

Permalink
Address whitewum's comments and redesign the invoke for blancePlan
Browse files Browse the repository at this point in the history
  • Loading branch information
heng committed Jun 28, 2019
1 parent 2a441c5 commit eba3bb9
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 54 deletions.
111 changes: 76 additions & 35 deletions src/meta/processors/admin/BalancePlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,96 @@
#include <folly/synchronization/Baton.h>
#include "meta/processors/Common.h"

DEFINE_uint32(task_concurrency, 10, "The tasks number could be invoked simultaneously");

namespace nebula {
namespace meta {

const std::string kBalancePlanTable = "__b_plan__"; // NOLINT

void BalancePlan::invoke() {
status_ = Status::IN_PROGRESS;
// TODO(heng) we want tasks for the same part to be invoked serially.
void BalancePlan::dispatchTasks() {
// Key -> spaceID + partID, Val -> List of task index in tasks_;
std::unordered_map<std::pair<GraphSpaceID, PartitionID>, std::vector<int32_t>> partTasks;
int32_t index = 0;
for (auto& task : tasks_) {
task.invoke();
partTasks[std::make_pair(task.spaceId_, task.partId_)].emplace_back(index++);
}
buckets_.resize(std::min(tasks_.size(), (size_t)FLAGS_task_concurrency));
for (auto it = partTasks.begin(); it != partTasks.end(); it++) {
size_t minNum = tasks_.size();
int32_t i = 0, minIndex = 0;
for (auto& bucket : buckets_) {
if (bucket.size() < minNum) {
minNum = bucket.size();
minIndex = i;
}
i++;
}
for (auto taskIndex : it->second) {
buckets_[minIndex].emplace_back(taskIndex);
}
}
saveInStore(true);
}

void BalancePlan::registerTaskCb() {
for (auto& task : tasks_) {
task.onFinished_ = [this]() {
bool finished = false;
{
std::lock_guard<std::mutex> lg(lock_);
finishedTaskNum_++;
if (finishedTaskNum_ == tasks_.size()) {
finished = true;
if (status_ == Status::IN_PROGRESS) {
status_ = Status::SUCCEEDED;
void BalancePlan::invoke() {
status_ = Status::IN_PROGRESS;
dispatchTasks();
for (size_t i = 0; i < buckets_.size(); i++) {
for (size_t j = 0; j < buckets_[i].size(); j++) {
auto taskIndex = buckets_[i][j];
tasks_[taskIndex].onFinished_ = [this, i, j]() {
bool finished = false;
{
std::lock_guard<std::mutex> lg(lock_);
finishedTaskNum_++;
if (finishedTaskNum_ == tasks_.size()) {
finished = true;
if (status_ == Status::IN_PROGRESS) {
status_ = Status::SUCCEEDED;
}
}
}
}
if (finished) {
saveInStore(true);
onFinished_();
}
};
task.onError_ = [this]() {
bool finished = false;
{
std::lock_guard<std::mutex> lg(lock_);
finishedTaskNum_++;
if (finishedTaskNum_ == tasks_.size()) {
finished = true;
if (finished) {
CHECK_EQ(j, this->buckets_[i].size() - 1);
saveInStore(true);
onFinished_();
} else {
if (j + 1 < this->buckets_[i].size()) {
auto& task = this->tasks_[this->buckets_[i][j + 1]];
task.invoke();
}
}
}; // onFinished
tasks_[taskIndex].onError_ = [this, i, j]() {
bool finished = false;
{
std::lock_guard<std::mutex> lg(lock_);
finishedTaskNum_++;
status_ = Status::FAILED;
if (finishedTaskNum_ == tasks_.size()) {
finished = true;
}
}
}
if (finished) {
saveInStore(true);
onFinished_();
}
};
if (finished) {
CHECK_EQ(j, this->buckets_[i].size() - 1);
saveInStore(true);
onFinished_();
} else {
if (j + 1 < this->buckets_[i].size()) {
auto& task = this->tasks_[this->buckets_[i][j + 1]];
task.invoke();
}
}
}; // onError
} // for (auto j = 0; j < buckets_[i].size(); j++)
} // for (auto i = 0; i < buckets_.size(); i++)

for (auto& bucket : buckets_) {
if (!bucket.empty()) {
tasks_[bucket[0]].invoke();
}
}
saveInStore(true);
}

bool BalancePlan::saveInStore(bool onlyPlan) {
Expand Down
12 changes: 7 additions & 5 deletions src/meta/processors/admin/BalancePlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class BalancePlan {
FRIEND_TEST(BalanceTest, BalancePlanTest);
FRIEND_TEST(BalanceTest, NormalTest);
FRIEND_TEST(BalanceTest, RecoveryTest);
FRIEND_TEST(BalanceTest, DispatchTasksTest);

public:
enum class Status : uint8_t {
Expand All @@ -42,11 +43,6 @@ class BalancePlan {
tasks_.emplace_back(std::move(task));
}

/**
* The method should be called after add all tasks into plan.
* */
void registerTaskCb();

void invoke();

/**
Expand All @@ -70,6 +66,8 @@ class BalancePlan {

std::string planVal() const;

void dispatchTasks();

static const std::string& prefix();

static BalanceID id(const folly::StringPiece& rawKey);
Expand All @@ -85,6 +83,10 @@ class BalancePlan {
size_t finishedTaskNum_ = 0;
std::function<void()> onFinished_;
Status status_ = Status::NOT_START;

// List of task index in tasks_;
using Bucket = std::vector<int32_t>;
std::vector<Bucket> buckets_;
};

} // namespace meta
Expand Down
2 changes: 0 additions & 2 deletions src/meta/processors/admin/Balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ bool Balancer::recovery() {
plan_->onFinished_();
return false;
}
plan_->registerTaskCb();
}
return true;
}
Expand Down Expand Up @@ -107,7 +106,6 @@ Status Balancer::buildBalancePlan() {
bool expected = true;
CHECK(running_.compare_exchange_strong(expected, false));
};
plan_->registerTaskCb();
if (plan_->tasks_.empty()) {
plan_->onFinished_();
return Status::Error("No Tasks");
Expand Down
27 changes: 25 additions & 2 deletions src/meta/processors/admin/Balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,35 @@ class Balancer {
StatusOr<BalanceID> balance();

/**
*
* */
* TODO(heng): Rollback some specific balance id
*/
Status rollback(BalanceID id) {
return Status::Error("unplemented, %ld", id);
}

/**
* TODO(heng): Only generate balance plan for our users.
* */
const BalancePlan* preview() {
return plan_.get();
}

/**
* TODO(heng): Execute balance plan from outside.
* */
Status execute(BalancePlan plan) {
UNUSED(plan);
return Status::Error("Unsupport it yet!");
}

/**
* TODO(heng): Execute specific balance plan by id.
* */
Status execute(BalanceID id) {
UNUSED(id);
return Status::Error("Unsupport it yet!");
}

private:
Balancer(kvstore::KVStore* kv, std::unique_ptr<AdminClient> client)
: kv_(kv)
Expand Down
105 changes: 95 additions & 10 deletions src/meta/test/BalancerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "fs/TempDir.h"
#include "meta/processors/partsMan/CreateSpaceProcessor.h"

DECLARE_uint32(task_concurrency);

namespace nebula {
namespace meta {

Expand Down Expand Up @@ -238,9 +240,59 @@ TEST(BalanceTest, BalancePartsTest) {
}
}

TEST(BalanceTest, DispatchTasksTest) {
{
FLAGS_task_concurrency = 10;
BalancePlan plan(0L, nullptr, nullptr);
for (int i = 0; i < 20; i++) {
BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr);
plan.addTask(std::move(task));
}
plan.dispatchTasks();
// All tasks is about space 0, part 0.
// So they will be dispatched into the same bucket.
ASSERT_EQ(10, plan.buckets_.size());
ASSERT_EQ(20, plan.buckets_[0].size());
}
{
FLAGS_task_concurrency = 10;
BalancePlan plan(0L, nullptr, nullptr);
for (int i = 0; i < 5; i++) {
BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr);
plan.addTask(std::move(task));
}
plan.dispatchTasks();
ASSERT_EQ(5, plan.buckets_.size());
for (auto& bucket : plan.buckets_) {
ASSERT_EQ(1, bucket.size());
}
}
{
FLAGS_task_concurrency = 20;
BalancePlan plan(0L, nullptr, nullptr);
for (int i = 0; i < 5; i++) {
BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr);
plan.addTask(std::move(task));
}
for (int i = 0; i < 10; i++) {
BalanceTask task(0, 0, i, HostAddr(i, 2), HostAddr(i, 3), nullptr, nullptr);
plan.addTask(std::move(task));
}
plan.dispatchTasks();
ASSERT_EQ(15, plan.buckets_.size());
for (auto i = 0; i < 10; i++) {
ASSERT_LE(1, plan.buckets_[i].size());
ASSERT_GE(2, plan.buckets_[i].size());
}
for (auto i = 10; i < 15; i++) {
ASSERT_EQ(0, plan.buckets_[i].size());
}
}
}

TEST(BalanceTest, BalancePlanTest) {
{
LOG(INFO) << "Test with all tasks succeeded!";
LOG(INFO) << "Test with all tasks succeeded, only one bucket!";
BalancePlan plan(0L, nullptr, nullptr);
std::vector<Status> sts(7, Status::OK());
std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
Expand All @@ -257,20 +309,54 @@ TEST(BalanceTest, BalancePlanTest) {
ASSERT_EQ(10, plan.finishedTaskNum_);
b.post();
};
plan.registerTaskCb();
plan.invoke();
b.wait();
// All tasks is about space 0, part 0.
// So they will be dispatched into the same bucket.
ASSERT_EQ(10, plan.buckets_.size());
ASSERT_EQ(10, plan.buckets_[0].size());
for (auto i = 1; i < 10; i++) {
ASSERT_EQ(0, plan.buckets_[1].size());
}
}
{
LOG(INFO) << "Test with all tasks succeeded, 10 buckets!";
BalancePlan plan(0L, nullptr, nullptr);
std::vector<Status> sts(7, Status::OK());
std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
auto client = std::make_unique<AdminClient>(std::move(injector));

for (int i = 0; i < 10; i++) {
BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr);
task.client_ = client.get();
plan.addTask(std::move(task));
}
folly::Baton<true, std::atomic> b;
plan.onFinished_ = [&plan, &b] () {
ASSERT_EQ(BalancePlan::Status::SUCCEEDED, plan.status_);
ASSERT_EQ(10, plan.finishedTaskNum_);
b.post();
};
plan.invoke();
b.wait();
// All tasks is about different parts.
// So they will be dispatched into different buckets.
ASSERT_EQ(10, plan.buckets_.size());
for (auto i = 0; i < 10; i++) {
ASSERT_EQ(1, plan.buckets_[1].size());
}
}
{
LOG(INFO) << "Test with one task failed!";
LOG(INFO) << "Test with one task failed, 10 buckets";
BalancePlan plan(0L, nullptr, nullptr);
std::unique_ptr<AdminClient> client1, client2;
{
std::vector<Status> sts(7, Status::OK());
std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
auto client = std::make_unique<AdminClient>(std::move(injector));
client1 = std::make_unique<AdminClient>(std::move(injector));
for (int i = 0; i < 9; i++) {
BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr);
task.client_ = client.get();
BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr);
task.client_ = client1.get();
plan.addTask(std::move(task));
}
}
Expand All @@ -284,18 +370,17 @@ TEST(BalanceTest, BalancePlanTest) {
Status::OK(),
Status::OK()};
std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
auto client = std::make_unique<AdminClient>(std::move(injector));
client2 = std::make_unique<AdminClient>(std::move(injector));
BalanceTask task(0, 0, 0, HostAddr(10, 0), HostAddr(10, 1), nullptr, nullptr);
task.client_ = client.get();
task.client_ = client2.get();
plan.addTask(std::move(task));
}
folly::Baton<true, std::atomic> b;
plan.onFinished_ = [&plan, &b] () {
ASSERT_EQ(BalancePlan::Status::SUCCEEDED, plan.status_);
ASSERT_EQ(BalancePlan::Status::FAILED, plan.status_);
ASSERT_EQ(10, plan.finishedTaskNum_);
b.post();
};
plan.registerTaskCb();
plan.invoke();
b.wait();
}
Expand Down

0 comments on commit eba3bb9

Please sign in to comment.