From eba3bb9269d70ee2bcc0f933e2a5437985ce5d63 Mon Sep 17 00:00:00 2001 From: heng Date: Thu, 20 Jun 2019 19:30:41 +0800 Subject: [PATCH] Address whitewum's comments and redesign the invoke for blancePlan --- src/meta/processors/admin/BalancePlan.cpp | 111 +++++++++++++++------- src/meta/processors/admin/BalancePlan.h | 12 ++- src/meta/processors/admin/Balancer.cpp | 2 - src/meta/processors/admin/Balancer.h | 27 +++++- src/meta/test/BalancerTest.cpp | 105 ++++++++++++++++++-- 5 files changed, 203 insertions(+), 54 deletions(-) diff --git a/src/meta/processors/admin/BalancePlan.cpp b/src/meta/processors/admin/BalancePlan.cpp index 855473f28f0..1c6954143d8 100644 --- a/src/meta/processors/admin/BalancePlan.cpp +++ b/src/meta/processors/admin/BalancePlan.cpp @@ -8,55 +8,96 @@ #include #include "meta/processors/Common.h" +DEFINE_uint32(task_concurrency, 10, "The tasks number could be invoked simultaneously"); + namespace nebula { namespace meta { const std::string kBalancePlanTable = "__b_plan__"; // NOLINT -void BalancePlan::invoke() { - status_ = Status::IN_PROGRESS; - // TODO(heng) we want tasks for the same part to be invoked serially. +void BalancePlan::dispatchTasks() { + // Key -> spaceID + partID, Val -> List of task index in tasks_; + std::unordered_map, std::vector> partTasks; + int32_t index = 0; for (auto& task : tasks_) { - task.invoke(); + partTasks[std::make_pair(task.spaceId_, task.partId_)].emplace_back(index++); + } + buckets_.resize(std::min(tasks_.size(), (size_t)FLAGS_task_concurrency)); + for (auto it = partTasks.begin(); it != partTasks.end(); it++) { + size_t minNum = tasks_.size(); + int32_t i = 0, minIndex = 0; + for (auto& bucket : buckets_) { + if (bucket.size() < minNum) { + minNum = bucket.size(); + minIndex = i; + } + i++; + } + for (auto taskIndex : it->second) { + buckets_[minIndex].emplace_back(taskIndex); + } } - saveInStore(true); } -void BalancePlan::registerTaskCb() { - for (auto& task : tasks_) { - task.onFinished_ = [this]() { - bool finished = false; - { - std::lock_guard lg(lock_); - finishedTaskNum_++; - if (finishedTaskNum_ == tasks_.size()) { - finished = true; - if (status_ == Status::IN_PROGRESS) { - status_ = Status::SUCCEEDED; +void BalancePlan::invoke() { + status_ = Status::IN_PROGRESS; + dispatchTasks(); + for (size_t i = 0; i < buckets_.size(); i++) { + for (size_t j = 0; j < buckets_[i].size(); j++) { + auto taskIndex = buckets_[i][j]; + tasks_[taskIndex].onFinished_ = [this, i, j]() { + bool finished = false; + { + std::lock_guard lg(lock_); + finishedTaskNum_++; + if (finishedTaskNum_ == tasks_.size()) { + finished = true; + if (status_ == Status::IN_PROGRESS) { + status_ = Status::SUCCEEDED; + } } } - } - if (finished) { - saveInStore(true); - onFinished_(); - } - }; - task.onError_ = [this]() { - bool finished = false; - { - std::lock_guard lg(lock_); - finishedTaskNum_++; - if (finishedTaskNum_ == tasks_.size()) { - finished = true; + if (finished) { + CHECK_EQ(j, this->buckets_[i].size() - 1); + saveInStore(true); + onFinished_(); + } else { + if (j + 1 < this->buckets_[i].size()) { + auto& task = this->tasks_[this->buckets_[i][j + 1]]; + task.invoke(); + } + } + }; // onFinished + tasks_[taskIndex].onError_ = [this, i, j]() { + bool finished = false; + { + std::lock_guard lg(lock_); + finishedTaskNum_++; status_ = Status::FAILED; + if (finishedTaskNum_ == tasks_.size()) { + finished = true; + } } - } - if (finished) { - saveInStore(true); - onFinished_(); - } - }; + if (finished) { + CHECK_EQ(j, this->buckets_[i].size() - 1); + saveInStore(true); + onFinished_(); + } else { + if (j + 1 < this->buckets_[i].size()) { + auto& task = this->tasks_[this->buckets_[i][j + 1]]; + task.invoke(); + } + } + }; // onError + } // for (auto j = 0; j < buckets_[i].size(); j++) + } // for (auto i = 0; i < buckets_.size(); i++) + + for (auto& bucket : buckets_) { + if (!bucket.empty()) { + tasks_[bucket[0]].invoke(); + } } + saveInStore(true); } bool BalancePlan::saveInStore(bool onlyPlan) { diff --git a/src/meta/processors/admin/BalancePlan.h b/src/meta/processors/admin/BalancePlan.h index 6a2d8e47a7a..f52b031c960 100644 --- a/src/meta/processors/admin/BalancePlan.h +++ b/src/meta/processors/admin/BalancePlan.h @@ -19,6 +19,7 @@ class BalancePlan { FRIEND_TEST(BalanceTest, BalancePlanTest); FRIEND_TEST(BalanceTest, NormalTest); FRIEND_TEST(BalanceTest, RecoveryTest); + FRIEND_TEST(BalanceTest, DispatchTasksTest); public: enum class Status : uint8_t { @@ -42,11 +43,6 @@ class BalancePlan { tasks_.emplace_back(std::move(task)); } - /** - * The method should be called after add all tasks into plan. - * */ - void registerTaskCb(); - void invoke(); /** @@ -70,6 +66,8 @@ class BalancePlan { std::string planVal() const; + void dispatchTasks(); + static const std::string& prefix(); static BalanceID id(const folly::StringPiece& rawKey); @@ -85,6 +83,10 @@ class BalancePlan { size_t finishedTaskNum_ = 0; std::function onFinished_; Status status_ = Status::NOT_START; + + // List of task index in tasks_; + using Bucket = std::vector; + std::vector buckets_; }; } // namespace meta diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp index 38fca08ed39..a04fdea5e11 100644 --- a/src/meta/processors/admin/Balancer.cpp +++ b/src/meta/processors/admin/Balancer.cpp @@ -71,7 +71,6 @@ bool Balancer::recovery() { plan_->onFinished_(); return false; } - plan_->registerTaskCb(); } return true; } @@ -107,7 +106,6 @@ Status Balancer::buildBalancePlan() { bool expected = true; CHECK(running_.compare_exchange_strong(expected, false)); }; - plan_->registerTaskCb(); if (plan_->tasks_.empty()) { plan_->onFinished_(); return Status::Error("No Tasks"); diff --git a/src/meta/processors/admin/Balancer.h b/src/meta/processors/admin/Balancer.h index 5b4bdea2fb8..8a41c81dd73 100644 --- a/src/meta/processors/admin/Balancer.h +++ b/src/meta/processors/admin/Balancer.h @@ -57,12 +57,35 @@ class Balancer { StatusOr balance(); /** - * - * */ + * TODO(heng): Rollback some specific balance id + */ Status rollback(BalanceID id) { return Status::Error("unplemented, %ld", id); } + /** + * TODO(heng): Only generate balance plan for our users. + * */ + const BalancePlan* preview() { + return plan_.get(); + } + + /** + * TODO(heng): Execute balance plan from outside. + * */ + Status execute(BalancePlan plan) { + UNUSED(plan); + return Status::Error("Unsupport it yet!"); + } + + /** + * TODO(heng): Execute specific balance plan by id. + * */ + Status execute(BalanceID id) { + UNUSED(id); + return Status::Error("Unsupport it yet!"); + } + private: Balancer(kvstore::KVStore* kv, std::unique_ptr client) : kv_(kv) diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 48f4661d9e9..2907c1ad044 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -12,6 +12,8 @@ #include "fs/TempDir.h" #include "meta/processors/partsMan/CreateSpaceProcessor.h" +DECLARE_uint32(task_concurrency); + namespace nebula { namespace meta { @@ -238,9 +240,59 @@ TEST(BalanceTest, BalancePartsTest) { } } +TEST(BalanceTest, DispatchTasksTest) { + { + FLAGS_task_concurrency = 10; + BalancePlan plan(0L, nullptr, nullptr); + for (int i = 0; i < 20; i++) { + BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + plan.addTask(std::move(task)); + } + plan.dispatchTasks(); + // All tasks is about space 0, part 0. + // So they will be dispatched into the same bucket. + ASSERT_EQ(10, plan.buckets_.size()); + ASSERT_EQ(20, plan.buckets_[0].size()); + } + { + FLAGS_task_concurrency = 10; + BalancePlan plan(0L, nullptr, nullptr); + for (int i = 0; i < 5; i++) { + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + plan.addTask(std::move(task)); + } + plan.dispatchTasks(); + ASSERT_EQ(5, plan.buckets_.size()); + for (auto& bucket : plan.buckets_) { + ASSERT_EQ(1, bucket.size()); + } + } + { + FLAGS_task_concurrency = 20; + BalancePlan plan(0L, nullptr, nullptr); + for (int i = 0; i < 5; i++) { + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + plan.addTask(std::move(task)); + } + for (int i = 0; i < 10; i++) { + BalanceTask task(0, 0, i, HostAddr(i, 2), HostAddr(i, 3), nullptr, nullptr); + plan.addTask(std::move(task)); + } + plan.dispatchTasks(); + ASSERT_EQ(15, plan.buckets_.size()); + for (auto i = 0; i < 10; i++) { + ASSERT_LE(1, plan.buckets_[i].size()); + ASSERT_GE(2, plan.buckets_[i].size()); + } + for (auto i = 10; i < 15; i++) { + ASSERT_EQ(0, plan.buckets_[i].size()); + } + } +} + TEST(BalanceTest, BalancePlanTest) { { - LOG(INFO) << "Test with all tasks succeeded!"; + LOG(INFO) << "Test with all tasks succeeded, only one bucket!"; BalancePlan plan(0L, nullptr, nullptr); std::vector sts(7, Status::OK()); std::unique_ptr injector(new TestFaultInjector(std::move(sts))); @@ -257,20 +309,54 @@ TEST(BalanceTest, BalancePlanTest) { ASSERT_EQ(10, plan.finishedTaskNum_); b.post(); }; - plan.registerTaskCb(); plan.invoke(); b.wait(); + // All tasks is about space 0, part 0. + // So they will be dispatched into the same bucket. + ASSERT_EQ(10, plan.buckets_.size()); + ASSERT_EQ(10, plan.buckets_[0].size()); + for (auto i = 1; i < 10; i++) { + ASSERT_EQ(0, plan.buckets_[1].size()); + } + } + { + LOG(INFO) << "Test with all tasks succeeded, 10 buckets!"; + BalancePlan plan(0L, nullptr, nullptr); + std::vector sts(7, Status::OK()); + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + + for (int i = 0; i < 10; i++) { + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + task.client_ = client.get(); + plan.addTask(std::move(task)); + } + folly::Baton b; + plan.onFinished_ = [&plan, &b] () { + ASSERT_EQ(BalancePlan::Status::SUCCEEDED, plan.status_); + ASSERT_EQ(10, plan.finishedTaskNum_); + b.post(); + }; + plan.invoke(); + b.wait(); + // All tasks is about different parts. + // So they will be dispatched into different buckets. + ASSERT_EQ(10, plan.buckets_.size()); + for (auto i = 0; i < 10; i++) { + ASSERT_EQ(1, plan.buckets_[1].size()); + } } { - LOG(INFO) << "Test with one task failed!"; + LOG(INFO) << "Test with one task failed, 10 buckets"; BalancePlan plan(0L, nullptr, nullptr); + std::unique_ptr client1, client2; { std::vector sts(7, Status::OK()); std::unique_ptr injector(new TestFaultInjector(std::move(sts))); - auto client = std::make_unique(std::move(injector)); + client1 = std::make_unique(std::move(injector)); for (int i = 0; i < 9; i++) { - BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); - task.client_ = client.get(); + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + task.client_ = client1.get(); plan.addTask(std::move(task)); } } @@ -284,18 +370,17 @@ TEST(BalanceTest, BalancePlanTest) { Status::OK(), Status::OK()}; std::unique_ptr injector(new TestFaultInjector(std::move(sts))); - auto client = std::make_unique(std::move(injector)); + client2 = std::make_unique(std::move(injector)); BalanceTask task(0, 0, 0, HostAddr(10, 0), HostAddr(10, 1), nullptr, nullptr); - task.client_ = client.get(); + task.client_ = client2.get(); plan.addTask(std::move(task)); } folly::Baton b; plan.onFinished_ = [&plan, &b] () { - ASSERT_EQ(BalancePlan::Status::SUCCEEDED, plan.status_); + ASSERT_EQ(BalancePlan::Status::FAILED, plan.status_); ASSERT_EQ(10, plan.finishedTaskNum_); b.post(); }; - plan.registerTaskCb(); plan.invoke(); b.wait(); }