From 2a441c5e10bc9a6d53b18731067419be0cd165b0 Mon Sep 17 00:00:00 2001
From: heng <heng.chen@vesoft.com>
Date: Fri, 24 May 2019 14:40:51 +0800
Subject: [PATCH] Implement balance logic in meta

---
 src/common/time/TimeUtils.h                   |   1 +
 src/interface/meta.thrift                     |  18 +-
 src/interface/storage.thrift                  |  24 +
 src/meta/CMakeLists.txt                       |   7 +-
 src/meta/MetaServiceHandler.cpp               |   6 +
 src/meta/MetaServiceHandler.h                 |  15 +-
 src/meta/MetaServiceUtils.cpp                 |   1 +
 src/meta/client/MetaClient.cpp                |   9 +
 src/meta/client/MetaClient.h                  |   6 +-
 src/meta/processors/BaseProcessor.h           |  22 +-
 src/meta/processors/Common.h                  |  42 ++
 src/meta/processors/admin/AdminClient.cpp     |  96 +++
 src/meta/processors/admin/AdminClient.h       |  74 +++
 src/meta/processors/admin/BalancePlan.cpp     | 157 +++++
 src/meta/processors/admin/BalancePlan.h       |  93 +++
 .../processors/admin/BalanceProcessor.cpp     |  49 ++
 src/meta/processors/admin/BalanceProcessor.h  |  34 ++
 src/meta/processors/admin/BalanceTask.cpp     | 234 ++++++++
 src/meta/processors/admin/BalanceTask.h       | 114 ++++
 src/meta/processors/admin/Balancer.cpp        | 318 ++++++++++
 src/meta/processors/admin/Balancer.h          | 119 ++++
 src/meta/test/BalancerTest.cpp                | 548 ++++++++++++++++++
 src/meta/test/CMakeLists.txt                  |  33 +-
 23 files changed, 1986 insertions(+), 34 deletions(-)
 create mode 100644 src/meta/processors/Common.h
 create mode 100644 src/meta/processors/admin/AdminClient.cpp
 create mode 100644 src/meta/processors/admin/AdminClient.h
 create mode 100644 src/meta/processors/admin/BalancePlan.cpp
 create mode 100644 src/meta/processors/admin/BalancePlan.h
 create mode 100644 src/meta/processors/admin/BalanceProcessor.cpp
 create mode 100644 src/meta/processors/admin/BalanceProcessor.h
 create mode 100644 src/meta/processors/admin/BalanceTask.cpp
 create mode 100644 src/meta/processors/admin/BalanceTask.h
 create mode 100644 src/meta/processors/admin/Balancer.cpp
 create mode 100644 src/meta/processors/admin/Balancer.h
 create mode 100644 src/meta/test/BalancerTest.cpp

diff --git a/src/common/time/TimeUtils.h b/src/common/time/TimeUtils.h
index 387134923cb..66fde102e92 100644
--- a/src/common/time/TimeUtils.h
+++ b/src/common/time/TimeUtils.h
@@ -7,6 +7,7 @@
 #ifndef COMMON_TIME_TIMEUTILS_H_
 #define COMMON_TIME_TIMEUTILS_H_
 
+#include "base/Base.h"
 #include <sys/time.h>
 
 namespace nebula {
diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift
index f42c6d85edc..e7de25a3b2e 100644
--- a/src/interface/meta.thrift
+++ b/src/interface/meta.thrift
@@ -26,6 +26,7 @@ enum ErrorCode {
     E_NOT_FOUND      = -23,
     E_INVALID_HOST   = -24,
     E_UNSUPPORTED    = -25,
+    E_BALANCER_RUNNING = -26,
 
     // KV Failure
     E_STORE_FAILURE          = -31,
@@ -416,6 +417,19 @@ struct CheckPasswordReq {
     2: string encoded_pwd,
 }
 
+struct BalanceReq {
+    1: optional common.GraphSpaceID space_id,
+    // Specify the balance id to check the status of the related balance plan
+    2: optional i64 id,
+}
+
+struct BalanceResp {
+    1: ErrorCode        code,
+    2: i64              id,
+    // Valid if code equals E_LEADER_CHANGED.
+    3: common.HostAddr  leader,
+}
+
 service MetaService {
     ExecResp createSpace(1: CreateSpaceReq req);
     ExecResp dropSpace(1: DropSpaceReq req);
@@ -447,8 +461,6 @@ service MetaService {
     ExecResp removeRange(1: RemoveRangeReq req);
     ScanResp scan(1: ScanReq req);
 
-    HBResp           heartBeat(1: HBReq req);
-
     ExecResp createUser(1: CreateUserReq req);
     ExecResp dropUser(1: DropUserReq req);
     ExecResp alterUser(1: AlterUserReq req);
@@ -460,5 +472,7 @@ service MetaService {
     ExecResp changePassword(1: ChangePasswordReq req);
     ExecResp checkPassword(1: CheckPasswordReq req);
 
+    HBResp           heartBeat(1: HBReq req);
+    BalanceResp      balance(1: BalanceReq req);
 }
 
diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift
index fafa4cf21d0..486fde9c96f 100644
--- a/src/interface/storage.thrift
+++ b/src/interface/storage.thrift
@@ -160,6 +160,25 @@ struct AddEdgesRequest {
     3: bool overwritable,
 }
 
+struct AdminExecResp {
+
+}
+
+struct AddPartReq {
+    1: common.GraphSpaceID space_id,
+    2: common.PartitionID  part_id,
+}
+
+struct RemovePartReq {
+    1: common.GraphSpaceID space_id,
+    2: common.PartitionID  part_id,
+}
+
+struct MemberChangeReq {
+    1: common.GraphSpaceID space_id,
+    2: common.PartitionID  part_id,
+}
+
 service StorageService {
     QueryResponse getOutBound(1: GetNeighborsRequest req)
     QueryResponse getInBound(1: GetNeighborsRequest req)
@@ -173,5 +192,10 @@ service StorageService {
 
     ExecResponse addVertices(1: AddVerticesRequest req);
     ExecResponse addEdges(1: AddEdgesRequest req);
+
+    // Interfaces for admin operations
+    AdminExecResp addPart(1: AddPartReq req);
+    AdminExecResp removePart(1: RemovePartReq req);
+    AdminExecResp memberChange(1: MemberChangeReq req);
 }
 
diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt
index 7d35f54134f..1ebfbeaaab6 100644
--- a/src/meta/CMakeLists.txt
+++ b/src/meta/CMakeLists.txt
@@ -38,7 +38,13 @@ add_library(
     processors/customKV/ScanProcessor.cpp
     processors/admin/HBProcessor.cpp
     processors/usersMan/AuthenticationProcessor.cpp
+    processors/admin/BalanceProcessor.cpp
+    processors/admin/Balancer.cpp
+    processors/admin/BalancePlan.cpp
+    processors/admin/BalanceTask.cpp
+    processors/admin/AdminClient.cpp
 )
+
 add_dependencies(
     meta_service_handler
     base_obj
@@ -69,5 +75,4 @@ add_dependencies(
     common_thrift_obj
     schema_obj)
 
-
 add_subdirectory(test)
diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp
index c3cde610ef0..64cec63742c 100644
--- a/src/meta/MetaServiceHandler.cpp
+++ b/src/meta/MetaServiceHandler.cpp
@@ -32,6 +32,7 @@
 #include "meta/processors/customKV/RemoveRangeProcessor.h"
 #include "meta/processors/admin/HBProcessor.h"
 #include "meta/processors/usersMan/AuthenticationProcessor.h"
+#include "meta/processors/admin/BalanceProcessor.h"
 
 #define RETURN_FUTURE(processor) \
     auto f = processor->getFuture(); \
@@ -251,5 +252,10 @@ MetaServiceHandler::future_checkPassword(const cpp2::CheckPasswordReq& req) {
     RETURN_FUTURE(processor);
 }
 
+folly::Future<cpp2::BalanceResp>
+MetaServiceHandler::future_balance(const cpp2::BalanceReq& req) {
+    auto* processor = BalanceProcessor::instance(kvstore_);
+    RETURN_FUTURE(processor);
+}
 }  // namespace meta
 }  // namespace nebula
diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h
index b679da93340..7d78a75ae7f 100644
--- a/src/meta/MetaServiceHandler.h
+++ b/src/meta/MetaServiceHandler.h
@@ -101,12 +101,6 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
     folly::Future<cpp2::ListEdgesResp>
     future_listEdges(const cpp2::ListEdgesReq& req) override;
 
-    /**
-     * HeartBeat
-     * */
-    folly::Future<cpp2::HBResp>
-    future_heartBeat(const cpp2::HBReq& req) override;
-
     /**
      * User manager
      **/
@@ -140,6 +134,15 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
     folly::Future<cpp2::ExecResp>
     future_checkPassword(const cpp2::CheckPasswordReq& req) override;
 
+    /**
+     * HeartBeat
+     * */
+    folly::Future<cpp2::HBResp>
+    future_heartBeat(const cpp2::HBReq& req) override;
+
+    folly::Future<cpp2::BalanceResp>
+    future_balance(const cpp2::BalanceReq& req) override;
+
 private:
     kvstore::KVStore* kvstore_ = nullptr;
 };
diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp
index 2ed8292d164..76a5169b247 100644
--- a/src/meta/MetaServiceUtils.cpp
+++ b/src/meta/MetaServiceUtils.cpp
@@ -20,6 +20,7 @@ const std::string kIndexTable  = "__index__";   // NOLINT
 const std::string kUsersTable  = "__users__";    // NOLINT
 const std::string kRolesTable  = "__roles__";    // NOLINT
 
+
 const std::string kHostOnline = "Online";       // NOLINT
 const std::string kHostOffline = "Offline";     // NOLINT
 
diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp
index 2abad555996..d13a0465b81 100644
--- a/src/meta/client/MetaClient.cpp
+++ b/src/meta/client/MetaClient.cpp
@@ -957,5 +957,14 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
         return resp.code == cpp2::ErrorCode::SUCCEEDED;
     }, true);
 }
+
+folly::Future<StatusOr<int64_t>> MetaClient::balance() {
+    cpp2::BalanceReq req;
+    return getResponse(std::move(req), [] (auto client, auto request) {
+        return client->future_balance(request);
+    }, [] (cpp2::BalanceResp&& resp) -> int64_t {
+        return resp.id;
+    }, true);
+}
 }  // namespace meta
 }  // namespace nebula
diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h
index 646a673d90b..802982ccf77 100644
--- a/src/meta/client/MetaClient.h
+++ b/src/meta/client/MetaClient.h
@@ -157,7 +157,11 @@ class MetaClient {
     folly::Future<StatusOr<bool>>
     removeRange(std::string segment, std::string start, std::string end);
 
-    // Operations for cache.
+    // Operations for admin
+    folly::Future<StatusOr<int64_t>>
+    balance();
+
+    // Opeartions for cache.
     StatusOr<GraphSpaceID> getSpaceIdByNameFromCache(const std::string& name);
 
     StatusOr<TagID> getTagIDByNameFromCache(const GraphSpaceID& space, const std::string& name);
diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h
index 0835663112b..8b449f6db49 100644
--- a/src/meta/processors/BaseProcessor.h
+++ b/src/meta/processors/BaseProcessor.h
@@ -17,33 +17,13 @@
 #include "meta/MetaServiceUtils.h"
 #include "meta/common/MetaCommon.h"
 #include "network/NetworkUtils.h"
+#include "meta/processors/Common.h"
 
 namespace nebula {
 namespace meta {
 
 using nebula::network::NetworkUtils;
 
-const PartitionID kDefaultPartId = 0;
-const GraphSpaceID kDefaultSpaceId = 0;
-
-class LockUtils {
-public:
-    LockUtils() = delete;
-#define GENERATE_LOCK(Entry) \
-    static folly::SharedMutex& Entry##Lock() { \
-        static folly::SharedMutex l; \
-        return l; \
-    }
-
-GENERATE_LOCK(space);
-GENERATE_LOCK(id);
-GENERATE_LOCK(tag);
-GENERATE_LOCK(edge);
-GENERATE_LOCK(user);
-
-#undef GENERATE_LOCK
-};
-
 #define CHECK_SPACE_ID_AND_RETURN(spaceID) \
     if (spaceExist(spaceID) == Status::SpaceNotFound()) { \
         resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \
diff --git a/src/meta/processors/Common.h b/src/meta/processors/Common.h
new file mode 100644
index 00000000000..9dcda6a4263
--- /dev/null
+++ b/src/meta/processors/Common.h
@@ -0,0 +1,42 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#ifndef META_PROCESSORS_COMMON_H_
+#define META_PROCESSORS_COMMON_H_
+
+#include "base/Base.h"
+
+namespace nebula {
+namespace meta {
+
+static const PartitionID kDefaultPartId = 0;
+static const GraphSpaceID kDefaultSpaceId = 0;
+
+using BalanceID = int64_t;
+
+class LockUtils {
+public:
+    LockUtils() = delete;
+#define GENERATE_LOCK(Entry) \
+    static folly::SharedMutex& Entry##Lock() { \
+        static folly::SharedMutex l; \
+        return l; \
+    }
+
+GENERATE_LOCK(space);
+GENERATE_LOCK(id);
+GENERATE_LOCK(tag);
+GENERATE_LOCK(edge);
+GENERATE_LOCK(user);
+
+#undef GENERATE_LOCK
+};
+
+
+}  // namespace meta
+}  // namespace nebula
+#endif  // META_PROCESSORS_COMMON_H_
+
diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp
new file mode 100644
index 00000000000..1014893b7b6
--- /dev/null
+++ b/src/meta/processors/admin/AdminClient.cpp
@@ -0,0 +1,96 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#include "meta/processors/admin/AdminClient.h"
+
+namespace nebula {
+namespace meta {
+
+folly::Future<Status> AdminClient::transLeader(GraphSpaceID spaceId,
+                                                      PartitionID partId,
+                                                      const HostAddr& leader,
+                                                      const HostAddr& dst) {
+    UNUSED(spaceId);
+    UNUSED(partId);
+    UNUSED(leader);
+    UNUSED(dst);
+    if (injector_) {
+        return injector_->transLeader();
+    }
+    return Status::OK();
+}
+
+folly::Future<Status> AdminClient::addPart(GraphSpaceID spaceId,
+                                                  PartitionID partId,
+                                                  const HostAddr& host,
+                                                  bool asLearner) {
+    UNUSED(spaceId);
+    UNUSED(partId);
+    UNUSED(host);
+    UNUSED(asLearner);
+    if (injector_) {
+        return injector_->addPart();
+    }
+    return Status::OK();
+}
+
+folly::Future<Status> AdminClient::addLearner(GraphSpaceID spaceId, PartitionID partId) {
+    UNUSED(spaceId);
+    UNUSED(partId);
+    if (injector_) {
+        return injector_->addLearner();
+    }
+    return Status::OK();
+}
+
+folly::Future<Status> AdminClient::waitingForCatchUpData(GraphSpaceID spaceId,
+                                                                PartitionID partId) {
+    UNUSED(spaceId);
+    UNUSED(partId);
+    if (injector_) {
+        return injector_->waitingForCatchUpData();
+    }
+    return Status::OK();
+}
+
+folly::Future<Status> AdminClient::memberChange(GraphSpaceID spaceId, PartitionID partId) {
+    UNUSED(spaceId);
+    UNUSED(partId);
+    if (injector_) {
+        return injector_->memberChange();
+    }
+    return Status::OK();
+}
+
+folly::Future<Status> AdminClient::updateMeta(GraphSpaceID spaceId,
+                                              PartitionID partId,
+                                              const HostAddr& leader,
+                                              const HostAddr& dst) {
+    UNUSED(spaceId);
+    UNUSED(partId);
+    UNUSED(leader);
+    UNUSED(dst);
+    if (injector_) {
+        return injector_->updateMeta();
+    }
+    return Status::OK();
+}
+
+folly::Future<Status> AdminClient::removePart(GraphSpaceID spaceId,
+                                                     PartitionID partId,
+                                                     const HostAddr& host) {
+    UNUSED(spaceId);
+    UNUSED(partId);
+    UNUSED(host);
+    if (injector_) {
+        return injector_->removePart();
+    }
+    return Status::OK();
+}
+
+}  // namespace meta
+}  // namespace nebula
+
diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h
new file mode 100644
index 00000000000..cd0e3f60df4
--- /dev/null
+++ b/src/meta/processors/admin/AdminClient.h
@@ -0,0 +1,74 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#ifndef META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_
+#define META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_
+
+#include "base/Base.h"
+#include <folly/executors/IOThreadPoolExecutor.h>
+#include "base/Status.h"
+#include "thrift/ThriftClientManager.h"
+
+namespace nebula {
+namespace meta {
+
+class FaultInjector {
+public:
+    virtual ~FaultInjector() = default;
+    virtual folly::Future<Status> transLeader() = 0;
+    virtual folly::Future<Status> addPart() = 0;
+    virtual folly::Future<Status> addLearner() = 0;
+    virtual folly::Future<Status> waitingForCatchUpData() = 0;
+    virtual folly::Future<Status> memberChange() = 0;
+    virtual folly::Future<Status> updateMeta() = 0;
+    virtual folly::Future<Status> removePart() = 0;
+};
+
+class AdminClient {
+public:
+    AdminClient() = default;
+
+    explicit AdminClient(std::unique_ptr<FaultInjector> injector)
+        : injector_(std::move(injector)) {}
+
+    ~AdminClient() = default;
+
+    folly::Future<Status> transLeader(GraphSpaceID spaceId,
+                                      PartitionID partId,
+                                      const HostAddr& leader,
+                                      const HostAddr& dst);
+
+    folly::Future<Status> addPart(GraphSpaceID spaceId,
+                                  PartitionID partId,
+                                  const HostAddr& host,
+                                  bool asLearner);
+
+    folly::Future<Status> addLearner(GraphSpaceID spaceId, PartitionID partId);
+
+    folly::Future<Status> waitingForCatchUpData(GraphSpaceID spaceId, PartitionID partId);
+
+    folly::Future<Status> memberChange(GraphSpaceID spaceId, PartitionID partId);
+
+    folly::Future<Status> updateMeta(GraphSpaceID spaceId,
+                                     PartitionID partId,
+                                     const HostAddr& leader,
+                                     const HostAddr& dst);
+
+    folly::Future<Status> removePart(GraphSpaceID spaceId,
+                                     PartitionID partId,
+                                     const HostAddr& host);
+
+    FaultInjector* faultInjector() {
+        return injector_.get();
+    }
+
+private:
+    std::unique_ptr<FaultInjector> injector_{nullptr};
+};
+}  // namespace meta
+}  // namespace nebula
+
+#endif  // META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_
diff --git a/src/meta/processors/admin/BalancePlan.cpp b/src/meta/processors/admin/BalancePlan.cpp
new file mode 100644
index 00000000000..855473f28f0
--- /dev/null
+++ b/src/meta/processors/admin/BalancePlan.cpp
@@ -0,0 +1,157 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#include "meta/processors/admin/BalancePlan.h"
+#include <folly/synchronization/Baton.h>
+#include "meta/processors/Common.h"
+
+namespace nebula {
+namespace meta {
+
+const std::string kBalancePlanTable = "__b_plan__"; // NOLINT
+
+void BalancePlan::invoke() {
+    status_ = Status::IN_PROGRESS;
+    // TODO(heng) we want tasks for the same part to be invoked serially.
+    for (auto& task : tasks_) {
+        task.invoke();
+    }
+    saveInStore(true);
+}
+
+void BalancePlan::registerTaskCb() {
+    for (auto& task : tasks_) {
+        task.onFinished_ = [this]() {
+            bool finished = false;
+            {
+                std::lock_guard<std::mutex> lg(lock_);
+                finishedTaskNum_++;
+                if (finishedTaskNum_ == tasks_.size()) {
+                    finished = true;
+                    if (status_ == Status::IN_PROGRESS) {
+                        status_ = Status::SUCCEEDED;
+                    }
+                }
+            }
+            if (finished) {
+                saveInStore(true);
+                onFinished_();
+            }
+        };
+        task.onError_ = [this]() {
+            bool finished = false;
+            {
+                std::lock_guard<std::mutex> lg(lock_);
+                finishedTaskNum_++;
+                if (finishedTaskNum_ == tasks_.size()) {
+                    finished = true;
+                    status_ = Status::FAILED;
+                }
+            }
+            if (finished) {
+                saveInStore(true);
+                onFinished_();
+            }
+        };
+    }
+}
+
+bool BalancePlan::saveInStore(bool onlyPlan) {
+    if (kv_) {
+        std::vector<kvstore::KV> data;
+        data.emplace_back(planKey(), planVal());
+        if (!onlyPlan) {
+            for (auto& task : tasks_) {
+                data.emplace_back(task.taskKey(), task.taskVal());
+            }
+        }
+        folly::Baton<true, std::atomic> baton;
+        bool ret = false;
+        kv_->asyncMultiPut(kDefaultSpaceId,
+                           kDefaultPartId,
+                           std::move(data),
+                           [this, &baton, &ret] (kvstore::ResultCode code) {
+            if (kvstore::ResultCode::SUCCEEDED == code) {
+                ret = true;
+            } else {
+                LOG(ERROR) << "Can't write the kvstore, ret = " << static_cast<int32_t>(code);
+            }
+            baton.post();
+        });
+        baton.wait();
+        return ret;
+    }
+    return true;
+}
+
+bool BalancePlan::recovery() {
+    if (kv_) {
+        const auto& prefix = BalanceTask::prefix(id_);
+        std::unique_ptr<kvstore::KVIterator> iter;
+        auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
+        if (ret != kvstore::ResultCode::SUCCEEDED) {
+            LOG(ERROR) << "Can't access kvstore, ret = " << static_cast<int32_t>(ret);
+            return false;
+        }
+        while (iter->valid()) {
+            BalanceTask task;
+            task.kv_ = kv_;
+            task.client_ = client_;
+            {
+                auto tup = BalanceTask::parseKey(iter->key());
+                task.balanceId_ = std::get<0>(tup);
+                task.spaceId_ = std::get<1>(tup);
+                task.partId_ = std::get<2>(tup);
+                task.src_ = std::get<3>(tup);
+                task.dst_ = std::get<4>(tup);
+            }
+            {
+                auto tup = BalanceTask::parseVal(iter->val());
+                task.status_ = std::get<0>(tup);
+                task.ret_ = std::get<1>(tup);
+                if (task.ret_ == BalanceTask::Result::FAILED) {
+                    // Resume the failed task.
+                    task.ret_ = BalanceTask::Result::IN_PROGRESS;
+                }
+                task.startTimeMs_ = std::get<2>(tup);
+                task.endTimeMs_ = std::get<3>(tup);
+            }
+            tasks_.emplace_back(std::move(task));
+            iter->next();
+        }
+    }
+    return true;
+}
+
+std::string BalancePlan::planKey() const {
+    std::string str;
+    str.reserve(48);
+    str.append(reinterpret_cast<const char*>(kBalancePlanTable.data()), kBalancePlanTable.size());
+    str.append(reinterpret_cast<const char*>(&id_), sizeof(id_));
+    return str;
+}
+
+std::string BalancePlan::planVal() const {
+    std::string str;
+    str.append(reinterpret_cast<const char*>(&status_), sizeof(status_));
+    return str;
+}
+
+const std::string& BalancePlan::prefix() {
+    return kBalancePlanTable;
+}
+
+BalanceID BalancePlan::id(const folly::StringPiece& rawKey) {
+    return *reinterpret_cast<const BalanceID*>(rawKey.begin() + kBalancePlanTable.size());
+}
+
+BalancePlan::Status BalancePlan::status(const folly::StringPiece& rawVal) {
+    return static_cast<Status>(*rawVal.begin());
+}
+
+}  // namespace meta
+}  // namespace nebula
+
diff --git a/src/meta/processors/admin/BalancePlan.h b/src/meta/processors/admin/BalancePlan.h
new file mode 100644
index 00000000000..6a2d8e47a7a
--- /dev/null
+++ b/src/meta/processors/admin/BalancePlan.h
@@ -0,0 +1,93 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#ifndef META_ADMIN_BALANCEPLAN_H_
+#define META_ADMIN_BALANCEPLAN_H_
+
+#include <gtest/gtest_prod.h>
+#include "kvstore/KVStore.h"
+#include "meta/processors/admin/BalanceTask.h"
+
+namespace nebula {
+namespace meta {
+
+class BalancePlan {
+    friend class Balancer;
+    FRIEND_TEST(BalanceTest, BalancePlanTest);
+    FRIEND_TEST(BalanceTest, NormalTest);
+    FRIEND_TEST(BalanceTest, RecoveryTest);
+
+public:
+    enum class Status : uint8_t {
+        NOT_START          = 0x01,
+        IN_PROGRESS        = 0x02,
+        SUCCEEDED          = 0x03,
+        /**
+         * TODO(heng): Currently, after the plan failed, we will try to resume it
+         * when running "balance" again. But in many cases, the plan will be failed
+         * forever, it this cases, we should rollback the plan.
+         * */
+        FAILED             = 0x04,
+    };
+
+    BalancePlan(BalanceID id, kvstore::KVStore* kv, AdminClient* client)
+        : id_(id)
+        , kv_(kv)
+        , client_(client) {}
+
+    void addTask(BalanceTask task) {
+        tasks_.emplace_back(std::move(task));
+    }
+
+    /**
+     * The method should be called after add all tasks into plan.
+     * */
+    void registerTaskCb();
+
+    void invoke();
+
+    /**
+     * TODO(heng): How to rollback if the some tasks failed.
+     * For the tasks before UPDATE_META, they will go back to the original state before balance.
+     * For the tasks after UPDATE_META, they will go on until succeeded.
+     * NOTES: update_meta should be an atomic operation. There is no middle state inside.
+     * */
+    void rollback() {}
+
+    bool saveInStore(bool onlyPlan = false);
+
+    BalanceID id() const {
+        return id_;
+    }
+
+private:
+    bool recovery();
+
+    std::string planKey() const;
+
+    std::string planVal() const;
+
+    static const std::string& prefix();
+
+    static BalanceID id(const folly::StringPiece& rawKey);
+
+    static BalancePlan::Status status(const folly::StringPiece& rawVal);
+
+private:
+    BalanceID id_ = 0;
+    kvstore::KVStore* kv_ = nullptr;
+    AdminClient* client_ = nullptr;
+    std::vector<BalanceTask> tasks_;
+    std::mutex lock_;
+    size_t finishedTaskNum_ = 0;
+    std::function<void()> onFinished_;
+    Status status_ = Status::NOT_START;
+};
+
+}  // namespace meta
+}  // namespace nebula
+
+#endif  // META_ADMIN_BALANCEPLAN_H_
diff --git a/src/meta/processors/admin/BalanceProcessor.cpp b/src/meta/processors/admin/BalanceProcessor.cpp
new file mode 100644
index 00000000000..56e27b6c37b
--- /dev/null
+++ b/src/meta/processors/admin/BalanceProcessor.cpp
@@ -0,0 +1,49 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+
+#include "meta/processors/admin/BalanceProcessor.h"
+#include "meta/processors/admin/Balancer.h"
+
+namespace nebula {
+namespace meta {
+
+void BalanceProcessor::process(const cpp2::BalanceReq& req) {
+    if (req.get_space_id() != nullptr) {
+        LOG(ERROR) << "Unsupport balance for specific space " << *req.get_space_id();
+        resp_.set_code(cpp2::ErrorCode::E_UNSUPPORTED);
+        onFinished();
+        return;
+    }
+    if (req.get_id() != nullptr) {
+        LOG(ERROR) << "Unsupport show status for specific balance plan, id=" << *req.get_id();
+        resp_.set_code(cpp2::ErrorCode::E_UNSUPPORTED);
+        onFinished();
+        return;
+    }
+    auto hosts = ActiveHostsMan::instance()->getActiveHosts();
+    if (hosts.empty()) {
+        LOG(ERROR) << "There is no active hosts";
+        resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS);
+        onFinished();
+        return;
+    }
+    auto ret = Balancer::instance(kvstore_)->balance();
+    if (!ret.ok()) {
+        LOG(INFO) << "The balancer is running.";
+        resp_.set_code(cpp2::ErrorCode::E_BALANCER_RUNNING);
+        onFinished();
+        return;
+    }
+    resp_.set_id(ret.value());
+    resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
+    onFinished();
+}
+
+
+}  // namespace meta
+}  // namespace nebula
+
diff --git a/src/meta/processors/admin/BalanceProcessor.h b/src/meta/processors/admin/BalanceProcessor.h
new file mode 100644
index 00000000000..aaea85fcc9c
--- /dev/null
+++ b/src/meta/processors/admin/BalanceProcessor.h
@@ -0,0 +1,34 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#ifndef META_BALANCEPROCESSOR_H_
+#define META_BALANCEPROCESSOR_H_
+
+#include <gtest/gtest_prod.h>
+#include "meta/processors/BaseProcessor.h"
+#include "meta/ActiveHostsMan.h"
+#include "time/TimeUtils.h"
+
+namespace nebula {
+namespace meta {
+
+class BalanceProcessor : public BaseProcessor<cpp2::BalanceResp> {
+public:
+    static BalanceProcessor* instance(kvstore::KVStore* kvstore) {
+        return new BalanceProcessor(kvstore);
+    }
+
+    void process(const cpp2::BalanceReq& req);
+
+private:
+    explicit BalanceProcessor(kvstore::KVStore* kvstore)
+            : BaseProcessor<cpp2::BalanceResp>(kvstore) {}
+};
+
+}  // namespace meta
+}  // namespace nebula
+
+#endif  // META_BALANCEPROCESSOR_H_
diff --git a/src/meta/processors/admin/BalanceTask.cpp b/src/meta/processors/admin/BalanceTask.cpp
new file mode 100644
index 00000000000..6716976e514
--- /dev/null
+++ b/src/meta/processors/admin/BalanceTask.cpp
@@ -0,0 +1,234 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#include "meta/processors/admin/BalanceTask.h"
+#include <folly/synchronization/Baton.h>
+#include "meta/processors/Common.h"
+
+namespace nebula {
+namespace meta {
+
+#define SAVE_STATE() \
+    if (!saveInStore()) { \
+        ret_ = Result::FAILED; \
+        onError_(); \
+        return; \
+    }
+
+const std::string kBalanceTaskTable = "__b_task__"; // NOLINT
+
+void BalanceTask::invoke() {
+    CHECK_NOTNULL(client_);
+    if (ret_ == Result::FAILED) {
+        endTimeMs_ = time::TimeUtils::nowInSeconds();
+        saveInStore();
+        onError_();
+        return;
+    }
+    switch (status_) {
+        case Status::START: {
+            LOG(INFO) << taskIdStr_ << "Start to move part!";
+            status_ = Status::CHANGE_LEADER;
+            ret_ = Result::IN_PROGRESS;
+            startTimeMs_ = time::TimeUtils::nowInSeconds();
+        }
+        case Status::CHANGE_LEADER: {
+            LOG(INFO) << taskIdStr_ << "Ask the src to give up the leadership.";
+            SAVE_STATE();
+            client_->transLeader(spaceId_, partId_, src_, dst_).thenValue([this](auto&& resp) {
+                if (!resp.ok()) {
+                    ret_ = Result::FAILED;
+                } else {
+                    status_ = Status::ADD_PART_ON_DST;
+                }
+                invoke();
+            });
+            break;
+        }
+        case Status::ADD_PART_ON_DST: {
+            LOG(INFO) << taskIdStr_ << "Open the part as learner on dst.";
+            SAVE_STATE();
+            client_->addPart(spaceId_, partId_, dst_, true).thenValue([this](auto&& resp) {
+                if (!resp.ok()) {
+                    ret_ = Result::FAILED;
+                } else {
+                    status_ = Status::ADD_LEARNER;
+                }
+                invoke();
+            });
+            break;
+        }
+        case Status::ADD_LEARNER: {
+            LOG(INFO) << taskIdStr_ << "Add learner dst.";
+            SAVE_STATE();
+            client_->addLearner(spaceId_, partId_).thenValue([this](auto&& resp) {
+                if (!resp.ok()) {
+                    ret_ = Result::FAILED;
+                } else {
+                    status_ = Status::CATCH_UP_DATA;
+                }
+                invoke();
+            });
+            break;
+        }
+        case Status::CATCH_UP_DATA: {
+            LOG(INFO) << taskIdStr_ << "Waiting for the data catch up.";
+            SAVE_STATE();
+            client_->waitingForCatchUpData(spaceId_, partId_).thenValue([this](auto&& resp) {
+                if (!resp.ok()) {
+                    ret_ = Result::FAILED;
+                } else {
+                    status_ = Status::MEMBER_CHANGE;
+                }
+                invoke();
+            });
+            break;
+        }
+        case Status::MEMBER_CHANGE: {
+            LOG(INFO) << taskIdStr_ << "Send member change request to the leader"
+                      << ", it will add the new member on dst host"
+                      << " and remove the old member on src host.";
+            SAVE_STATE();
+            client_->memberChange(spaceId_, partId_).thenValue([this](auto&& resp) {
+                if (!resp.ok()) {
+                    ret_ = Result::FAILED;
+                } else {
+                    status_ = Status::UPDATE_PART_META;
+                }
+                invoke();
+            });
+            break;
+        }
+        case Status::UPDATE_PART_META: {
+            LOG(INFO) << taskIdStr_ << "Update meta for part.";
+            SAVE_STATE();
+            client_->updateMeta(spaceId_, partId_, src_, dst_).thenValue([this](auto&& resp) {
+                if (!resp.ok()) {
+                    ret_ = Result::FAILED;
+                } else {
+                    status_ = Status::REMOVE_PART_ON_SRC;
+                }
+                invoke();
+            });
+            break;
+        }
+        case Status::REMOVE_PART_ON_SRC: {
+            LOG(INFO) << taskIdStr_ << "Close part on src host.";
+            SAVE_STATE();
+            client_->removePart(spaceId_, partId_, src_).thenValue([this](auto&& resp) {
+                if (!resp.ok()) {
+                    ret_ = Result::FAILED;
+                } else {
+                    ret_ = Result::SUCCEEDED;
+                    status_ = Status::END;
+                }
+                invoke();
+            });
+            break;
+        }
+        case Status::END: {
+            LOG(INFO) << taskIdStr_ <<  "Part has been moved successfully!";
+            endTimeMs_ = time::TimeUtils::nowInSeconds();
+            SAVE_STATE();
+            onFinished_();
+            break;
+        }
+    }
+    return;
+}
+
+void BalanceTask::rollback() {
+    if (status_ < Status::UPDATE_PART_META) {
+        // TODO(heng): restart the part on its peers.
+    } else {
+        // TODO(heng): Go on the task.
+    }
+}
+
+bool BalanceTask::saveInStore() {
+    if (kv_) {
+        std::vector<kvstore::KV> data;
+        data.emplace_back(taskKey(), taskVal());
+        folly::Baton<true, std::atomic> baton;
+        bool ret = false;
+        kv_->asyncMultiPut(kDefaultSpaceId,
+                           kDefaultPartId,
+                           std::move(data),
+                           [this, &ret, &baton] (kvstore::ResultCode code) {
+            if (kvstore::ResultCode::SUCCEEDED == code) {
+                ret = true;
+            } else {
+                LOG(INFO) << taskIdStr_ << "Can't persist task!";
+            }
+            baton.post();
+        });
+        baton.wait();
+        return ret;
+    }
+    return true;
+}
+
+std::string BalanceTask::taskKey() {
+    std::string str;
+    str.reserve(64);
+    str.append(reinterpret_cast<const char*>(kBalanceTaskTable.data()), kBalanceTaskTable.size());
+    str.append(reinterpret_cast<const char*>(&balanceId_), sizeof(balanceId_));
+    str.append(reinterpret_cast<const char*>(&spaceId_), sizeof(spaceId_));
+    str.append(reinterpret_cast<const char*>(&partId_), sizeof(partId_));
+    str.append(reinterpret_cast<const char*>(&src_), sizeof(src_));
+    str.append(reinterpret_cast<const char*>(&dst_), sizeof(dst_));
+    return str;
+}
+
+std::string BalanceTask::taskVal() {
+    std::string str;
+    str.reserve(32);
+    str.append(reinterpret_cast<const char*>(&status_), sizeof(status_));
+    str.append(reinterpret_cast<const char*>(&ret_), sizeof(ret_));
+    str.append(reinterpret_cast<const char*>(&startTimeMs_), sizeof(startTimeMs_));
+    str.append(reinterpret_cast<const char*>(&endTimeMs_), sizeof(endTimeMs_));
+    return str;
+}
+
+std::string BalanceTask::prefix(BalanceID balanceId) {
+    std::string str;
+    str.reserve(32);
+    str.append(reinterpret_cast<const char*>(kBalanceTaskTable.data()), kBalanceTaskTable.size());
+    str.append(reinterpret_cast<const char*>(&balanceId), sizeof(balanceId));
+    return str;
+}
+
+std::tuple<BalanceID, GraphSpaceID, PartitionID, HostAddr, HostAddr>
+BalanceTask::parseKey(const folly::StringPiece& rawKey) {
+    int32_t offset = kBalanceTaskTable.size();
+    auto balanceId = *reinterpret_cast<const BalanceID*>(rawKey.begin() + offset);
+    offset += sizeof(balanceId);
+    auto spaceId = *reinterpret_cast<const GraphSpaceID*>(rawKey.begin() + offset);
+    offset += sizeof(GraphSpaceID);
+    auto partId = *reinterpret_cast<const PartitionID*>(rawKey.begin() + offset);
+    offset += sizeof(PartitionID);
+    auto src = *reinterpret_cast<const HostAddr*>(rawKey.begin() + offset);
+    offset += sizeof(HostAddr);
+    auto dst = *reinterpret_cast<const HostAddr*>(rawKey.begin() + offset);
+    return std::make_tuple(balanceId, spaceId, partId, src, dst);
+}
+
+std::tuple<BalanceTask::Status, BalanceTask::Result, int64_t, int64_t>
+BalanceTask::parseVal(const folly::StringPiece& rawVal) {
+    int32_t offset = 0;
+    auto status = *reinterpret_cast<const BalanceTask::Status*>(rawVal.begin() + offset);
+    offset += sizeof(BalanceTask::Status);
+    auto ret = *reinterpret_cast<const BalanceTask::Result*>(rawVal.begin() + offset);
+    offset += sizeof(BalanceTask::Result);
+    auto start = *reinterpret_cast<const int64_t*>(rawVal.begin() + offset);
+    offset += sizeof(int64_t);
+    auto end = *reinterpret_cast<const int64_t*>(rawVal.begin() + offset);
+    return std::make_tuple(status, ret, start, end);
+}
+
+}  // namespace meta
+}  // namespace nebula
+
diff --git a/src/meta/processors/admin/BalanceTask.h b/src/meta/processors/admin/BalanceTask.h
new file mode 100644
index 00000000000..4e114ea8c8a
--- /dev/null
+++ b/src/meta/processors/admin/BalanceTask.h
@@ -0,0 +1,114 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#ifndef META_ADMIN_BALANCETASK_H_
+#define META_ADMIN_BALANCETASK_H_
+
+#include <gtest/gtest_prod.h>
+#include "meta/ActiveHostsMan.h"
+#include "time/TimeUtils.h"
+#include "kvstore/KVStore.h"
+#include "network/NetworkUtils.h"
+#include "meta/processors/admin/AdminClient.h"
+#include "meta/processors/Common.h"
+
+namespace nebula {
+namespace meta {
+
+class BalanceTask {
+    friend class BalancePlan;
+    FRIEND_TEST(BalanceTaskTest, SimpleTest);
+    FRIEND_TEST(BalanceTest, BalancePlanTest);
+    FRIEND_TEST(BalanceTest, NormalTest);
+    FRIEND_TEST(BalanceTest, RecoveryTest);
+
+public:
+    BalanceTask() = default;
+    BalanceTask(BalanceID balanceId,
+                GraphSpaceID spaceId,
+                PartitionID partId,
+                const HostAddr& src,
+                const HostAddr& dst,
+                kvstore::KVStore* kv,
+                AdminClient* client)
+        : balanceId_(balanceId)
+        , spaceId_(spaceId)
+        , partId_(partId)
+        , src_(src)
+        , dst_(dst)
+        , taskIdStr_(folly::stringPrintf(
+                                      "[%ld, %d, %s:%d->%s:%d] ",
+                                      balanceId,
+                                      partId,
+                                      network::NetworkUtils::intToIPv4(src.first).c_str(),
+                                      src.second,
+                                      network::NetworkUtils::intToIPv4(dst.first).c_str(),
+                                      dst.second))
+        , kv_(kv)
+        , client_(client) {}
+
+    const std::string& taskIdStr() const {
+        return taskIdStr_;
+    }
+
+    void invoke();
+
+    void rollback();
+
+private:
+    enum class Status : uint8_t {
+        START               = 0x01,
+        CHANGE_LEADER       = 0x02,
+        ADD_PART_ON_DST     = 0x03,
+        ADD_LEARNER         = 0x04,
+        CATCH_UP_DATA       = 0x05,
+        MEMBER_CHANGE       = 0x06,
+        UPDATE_PART_META    = 0x07,  // After this state, we can't rollback anymore.
+        REMOVE_PART_ON_SRC  = 0x08,
+        END                 = 0xFF,
+    };
+
+    enum class Result : uint8_t {
+        SUCCEEDED           = 0x01,
+        FAILED              = 0x02,
+        IN_PROGRESS         = 0x03,
+    };
+
+    bool saveInStore();
+
+    std::string taskKey();
+
+    std::string taskVal();
+
+    static std::string prefix(BalanceID balanceId);
+
+    static std::tuple<BalanceID, GraphSpaceID, PartitionID, HostAddr, HostAddr>
+    parseKey(const folly::StringPiece& rawKey);
+
+    static std::tuple<BalanceTask::Status, BalanceTask::Result, int64_t, int64_t>
+    parseVal(const folly::StringPiece& rawVal);
+
+private:
+    BalanceID    balanceId_;
+    GraphSpaceID spaceId_;
+    PartitionID  partId_;
+    HostAddr     src_;
+    HostAddr     dst_;
+    std::string  taskIdStr_;
+    kvstore::KVStore* kv_ = nullptr;
+    AdminClient* client_ = nullptr;
+    Status       status_ = Status::START;
+    Result       ret_ = Result::IN_PROGRESS;
+    int64_t      startTimeMs_ = 0;
+    int64_t      endTimeMs_ = 0;
+    std::function<void()> onFinished_;
+    std::function<void()> onError_;
+};
+
+}  // namespace meta
+}  // namespace nebula
+
+#endif  // META_ADMIN_BALANCETASK_H_
diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp
new file mode 100644
index 00000000000..38fca08ed39
--- /dev/null
+++ b/src/meta/processors/admin/Balancer.cpp
@@ -0,0 +1,318 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#include "meta/processors/admin/Balancer.h"
+#include <algorithm>
+#include <cstdlib>
+#include "meta/processors/Common.h"
+#include "meta/ActiveHostsMan.h"
+#include "meta/MetaServiceUtils.h"
+
+namespace nebula {
+namespace meta {
+
+StatusOr<BalanceID> Balancer::balance() {
+    bool expected = false;
+    if (running_.compare_exchange_strong(expected, true)) {
+        if (!recovery()) {
+            LOG(ERROR) << "Recovery balancer failed!";
+            return Status::Error("Can't do balance because there is one corruptted balance plan!");
+        }
+        if (plan_ == nullptr) {
+            LOG(INFO) << "There is no corrupted plan need to recovery, so create a new one";
+            auto status = buildBalancePlan();
+            if (plan_ == nullptr) {
+                LOG(ERROR) << "Create balance plan failed!";
+                return status;
+            }
+        }
+        executor_->add(std::bind(&BalancePlan::invoke, plan_.get()));
+        return plan_->id();
+    }
+    return Status::Error("balance running");
+}
+
+bool Balancer::recovery() {
+    CHECK(!plan_) << "plan should be nullptr now";
+    if (kv_) {
+        const auto& prefix = BalancePlan::prefix();
+        std::unique_ptr<kvstore::KVIterator> iter;
+        auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
+        if (ret != kvstore::ResultCode::SUCCEEDED) {
+            LOG(ERROR) << "Can't access kvstore, ret = " << static_cast<int32_t>(ret);
+            return false;
+        }
+        std::vector<int64_t> corruptedPlans;
+        while (iter->valid()) {
+            auto balanceId = BalancePlan::id(iter->key());
+            auto status = BalancePlan::status(iter->val());
+            if (status == BalancePlan::Status::IN_PROGRESS
+                    || status == BalancePlan::Status::FAILED) {
+                corruptedPlans.emplace_back(balanceId);
+            }
+            iter->next();
+        }
+        if (corruptedPlans.empty()) {
+            LOG(INFO) << "No corrupted plan need to recovery!";
+            return true;
+        }
+        CHECK_EQ(1, corruptedPlans.size());
+        plan_ = std::make_unique<BalancePlan>(corruptedPlans[0], kv_, client_.get());
+        plan_->onFinished_ = [this] () {
+            plan_.reset();
+            bool expected = true;
+            CHECK(running_.compare_exchange_strong(expected, false));
+        };
+        if (!plan_->recovery()) {
+            LOG(ERROR) << "Can't recovery plan " << corruptedPlans[0];
+            plan_->onFinished_();
+            return false;
+        }
+        plan_->registerTaskCb();
+    }
+    return true;
+}
+
+Status Balancer::buildBalancePlan() {
+    CHECK(!plan_) << "plan should be nullptr now";
+    std::vector<GraphSpaceID> spaces;
+    {
+        // Get all spaces
+        folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
+        auto prefix = MetaServiceUtils::spacePrefix();
+        std::unique_ptr<kvstore::KVIterator> iter;
+        auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
+        if (ret != kvstore::ResultCode::SUCCEEDED) {
+            running_ = false;
+            return Status::Error("Can't access kvstore, ret = %d", static_cast<int32_t>(ret));
+        }
+        while (iter->valid()) {
+            auto spaceId = MetaServiceUtils::spaceId(iter->key());
+            spaces.push_back(spaceId);
+            iter->next();
+        }
+    }
+    plan_ = std::make_unique<BalancePlan>(time::TimeUtils::nowInSeconds(), kv_, client_.get());
+    for (auto spaceId : spaces) {
+        auto tasks = genTasks(spaceId);
+        for (auto& task : tasks) {
+            plan_->addTask(std::move(task));
+        }
+    }
+    plan_->onFinished_ = [this] () {
+        plan_.reset();
+        bool expected = true;
+        CHECK(running_.compare_exchange_strong(expected, false));
+    };
+    plan_->registerTaskCb();
+    if (plan_->tasks_.empty()) {
+        plan_->onFinished_();
+        return Status::Error("No Tasks");
+    }
+    if (!plan_->saveInStore()) {
+        plan_->onFinished_();
+        return Status::Error("Can't persist the plan");
+    }
+    return Status::OK();
+}
+
+std::vector<BalanceTask> Balancer::genTasks(GraphSpaceID spaceId) {
+    CHECK(!!plan_) << "plan should not be nullptr";
+    std::unordered_map<HostAddr, std::vector<PartitionID>> hostParts;
+    int32_t totalParts = 0;
+    getHostParts(spaceId, hostParts, totalParts);
+    if (totalParts == 0 || hostParts.empty()) {
+        LOG(ERROR) << "Invalid space " << spaceId;
+        return std::vector<BalanceTask>();
+    }
+    auto activeHosts = ActiveHostsMan::instance()->getActiveHosts();
+    std::vector<HostAddr> newlyAdded;
+    std::vector<HostAddr> lost;
+    calDiff(hostParts, activeHosts, newlyAdded, lost);
+    decltype(hostParts) newHostParts(hostParts);
+    for (auto& h : newlyAdded) {
+        newHostParts.emplace(h, std::vector<PartitionID>());
+    }
+    for (auto& h : lost) {
+        newHostParts.erase(h);
+    }
+    LOG(INFO) << "Now, try to balance the newHostParts";
+    // We have two parts need to balance, the first one is parts on lost hosts
+    // The seconds one is parts on unbalanced host in newHostParts.
+    std::vector<BalanceTask> tasks;
+    for (auto& h : lost) {
+        auto& lostParts = hostParts[h];
+        for (auto& partId : lostParts) {
+            auto ret = hostWithMinimalParts(newHostParts, partId);
+            if (!ret.ok()) {
+                LOG(ERROR) << "Error:" << ret.status();
+                return std::vector<BalanceTask>();
+            }
+            auto& luckyHost = ret.value();
+            newHostParts[luckyHost].emplace_back(partId);
+            tasks.emplace_back(plan_->id_,
+                               spaceId,
+                               partId,
+                               h,
+                               luckyHost,
+                               kv_,
+                               client_.get());
+        }
+    }
+    if (newHostParts.size() < 2) {
+        LOG(INFO) << "Too few hosts, no need for balance!";
+        return tasks;
+    }
+    balanceParts(plan_->id_, spaceId, newHostParts, totalParts, tasks);
+    return tasks;
+}
+
+void Balancer::balanceParts(BalanceID balanceId,
+                            GraphSpaceID spaceId,
+                            std::unordered_map<HostAddr, std::vector<PartitionID>>& newHostParts,
+                            int32_t totalParts,
+                            std::vector<BalanceTask>& tasks) {
+    float avgLoad = static_cast<float>(totalParts)/newHostParts.size();
+    LOG(INFO) << "The expect avg load is " << avgLoad;
+    int32_t minLoad = std::floor(avgLoad);
+    int32_t maxLoad = std::ceil(avgLoad);
+    auto hosts = sortedHostsByParts(newHostParts);
+    CHECK_GT(hosts.size(), 1);
+    auto maxPartsHost = hosts.back();
+    auto minPartsHost = hosts.front();
+    auto lastDelta = maxPartsHost.second - minPartsHost.second + 1;
+    while (maxPartsHost.second > maxLoad
+            || minPartsHost.second < minLoad
+            || maxPartsHost.second - minPartsHost.second < lastDelta) {
+        CHECK_GE(maxPartsHost.second, avgLoad);
+        CHECK_GE(avgLoad, minPartsHost.second);
+        auto& partsFrom = newHostParts[maxPartsHost.first];
+        auto& partsTo = newHostParts[minPartsHost.first];
+        VLOG(1) << maxPartsHost.first << ":" << partsFrom.size()
+                << "->" << minPartsHost.first << ":" << partsTo.size()
+                << ", lastDelta=" << lastDelta;
+        std::vector<PartitionID> diff;
+        std::set_difference(partsFrom.begin(), partsFrom.end(), partsTo.begin(), partsTo.end(),
+                            std::inserter(diff, diff.begin()));
+        bool noAction = true;
+        for (auto& partId : diff) {
+            if (partsFrom.size() <= partsTo.size() + 1
+                    || partsFrom.size() <= (size_t)minLoad
+                    || partsTo.size() >= (size_t)maxLoad) {
+                VLOG(1) << "No need to move any parts from "
+                        << maxPartsHost.first << " to " << minPartsHost.first;
+                break;
+            }
+            VLOG(1) << maxPartsHost.first << "->" << minPartsHost.first << ": " << partId;
+            auto it = std::find(partsFrom.begin(), partsFrom.end(), partId);
+            CHECK(it != partsFrom.end());
+            partsFrom.erase(it);
+            partsTo.emplace_back(partId);
+            tasks.emplace_back(balanceId,
+                               spaceId,
+                               partId,
+                               maxPartsHost.first,
+                               minPartsHost.first,
+                               kv_,
+                               client_.get());
+            noAction = false;
+        }
+        if (noAction) {
+            break;
+        }
+        lastDelta = maxPartsHost.second - minPartsHost.second;
+        hosts = sortedHostsByParts(newHostParts);
+        maxPartsHost = hosts.back();
+        minPartsHost = hosts.front();
+    }
+    LOG(INFO) << "Balance tasks num: " << tasks.size();
+    for (auto& task : tasks) {
+        LOG(INFO) << task.taskIdStr();
+    }
+}
+
+void Balancer::getHostParts(GraphSpaceID spaceId,
+                            std::unordered_map<HostAddr, std::vector<PartitionID>>& hostParts,
+                            int32_t& totalParts) {
+    folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
+    auto prefix = MetaServiceUtils::partPrefix(spaceId);
+    std::unique_ptr<kvstore::KVIterator> iter;
+    auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
+    if (ret != kvstore::ResultCode::SUCCEEDED) {
+        LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId;
+        return;
+    }
+    while (iter->valid()) {
+        auto key = iter->key();
+        PartitionID partId;
+        memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID));
+        auto partHosts = MetaServiceUtils::parsePartVal(iter->val());
+        for (auto& ph : partHosts) {
+            hostParts[HostAddr(ph.ip, ph.port)].emplace_back(partId);
+        }
+        iter->next();
+        totalParts++;
+    }
+    auto key = MetaServiceUtils::spaceKey(spaceId);
+    std::string value;
+    auto code = kv_->get(kDefaultSpaceId, kDefaultPartId, key, &value);
+    if (code != kvstore::ResultCode::SUCCEEDED) {
+        LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId;
+        return;
+    }
+    auto properties = MetaServiceUtils::parseSpace(value);
+    CHECK_EQ(totalParts, properties.get_partition_num());
+    totalParts *= properties.get_replica_factor();
+}
+
+void Balancer::calDiff(const std::unordered_map<HostAddr, std::vector<PartitionID>>& hostParts,
+                       const std::vector<HostAddr>& activeHosts,
+                       std::vector<HostAddr>& newlyAdded,
+                       std::vector<HostAddr>& lost) {
+    for (auto it = hostParts.begin(); it != hostParts.end(); it++) {
+        VLOG(3) << "Host " << it->first << ", parts " << it->second.size();
+        if (std::find(activeHosts.begin(), activeHosts.end(), it->first) == activeHosts.end()) {
+            lost.emplace_back(it->first);
+        }
+    }
+    for (auto& h : activeHosts) {
+        VLOG(3) << "Actvie Host " << h;
+        if (hostParts.find(h) == hostParts.end()) {
+            newlyAdded.emplace_back(h);
+        }
+    }
+}
+
+std::vector<std::pair<HostAddr, int32_t>>
+Balancer::sortedHostsByParts(const std::unordered_map<HostAddr,
+                                                      std::vector<PartitionID>>& hostParts) {
+    std::vector<std::pair<HostAddr, int32_t>> hosts;
+    for (auto it = hostParts.begin(); it != hostParts.end(); it++) {
+        hosts.emplace_back(it->first, it->second.size());
+    }
+    std::sort(hosts.begin(), hosts.end(), [](const auto& l, const auto& r) {
+        return l.second < r.second;
+    });
+    return hosts;
+}
+
+StatusOr<HostAddr> Balancer::hostWithMinimalParts(
+                        const std::unordered_map<HostAddr, std::vector<PartitionID>>& hostParts,
+                        PartitionID partId) {
+    auto hosts = sortedHostsByParts(hostParts);
+    for (auto& h : hosts) {
+        auto it = hostParts.find(h.first);
+        CHECK(it != hostParts.end());
+        if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end()) {
+            return h.first;
+        }
+    }
+    return Status::Error("No host is suitable for %d", partId);
+}
+
+}  // namespace meta
+}  // namespace nebula
+
diff --git a/src/meta/processors/admin/Balancer.h b/src/meta/processors/admin/Balancer.h
new file mode 100644
index 00000000000..5b4bdea2fb8
--- /dev/null
+++ b/src/meta/processors/admin/Balancer.h
@@ -0,0 +1,119 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+#ifndef META_ADMIN_BALANCERR_H_
+#define META_ADMIN_BALANCERR_H_
+
+#include <gtest/gtest_prod.h>
+#include <folly/executors/CPUThreadPoolExecutor.h>
+#include "time/TimeUtils.h"
+#include "kvstore/KVStore.h"
+#include "network/NetworkUtils.h"
+#include "meta/processors/admin/AdminClient.h"
+#include "meta/processors/admin/BalanceTask.h"
+#include "meta/processors/admin/BalancePlan.h"
+
+namespace nebula {
+namespace meta {
+/**
+There are two interfaces public:
+ * Balance:  it will construct a balance plan and invoked it. If last balance plan is not succeeded, it will
+ * try to resume it.
+ *
+ * Rollback: In many cases, if some plan failed forever, we call this interface to rollback.
+
+Some notes:
+1. Balance will generate balance plan according to current active hosts and parts allocation
+2. For the plan, we hope after moving the least parts , it will reach a reasonable state.
+3. Only one balance plan could be invoked at the same time.
+4. Each balance plan has one id, and we could show the status by "balance id" command
+   and after FO, we could resume the balance plan by type "balance" again.
+5. Each balance plan contains many balance tasks, the task represents the minimum movement unit.
+6. We save the whole balancePlan state in kvstore to do failover.
+7. Each balance task contains serval steps. And it should be executed step by step.
+8. One task failed will result in the whole balance plan failed.
+9. Currently, we hope tasks for the same part could be invoked serially
+ * */
+class Balancer {
+    FRIEND_TEST(BalanceTest, BalancePartsTest);
+    FRIEND_TEST(BalanceTest, NormalTest);
+    FRIEND_TEST(BalanceTest, RecoveryTest);
+
+public:
+    static Balancer* instance(kvstore::KVStore* kv) {
+        static std::unique_ptr<AdminClient> client(new AdminClient());
+        static std::unique_ptr<Balancer> balancer(new Balancer(kv, std::move(client)));
+        return balancer.get();
+    }
+
+    ~Balancer() = default;
+
+    /*
+     * Return Error if reject the balance request, otherwise return balance id.
+     * */
+    StatusOr<BalanceID> balance();
+
+    /**
+     * 
+     * */
+    Status rollback(BalanceID id) {
+        return Status::Error("unplemented, %ld", id);
+    }
+
+private:
+    Balancer(kvstore::KVStore* kv, std::unique_ptr<AdminClient> client)
+        : kv_(kv)
+        , client_(std::move(client)) {
+        executor_.reset(new folly::CPUThreadPoolExecutor(1));
+    }
+    /*
+     * When the balancer failover, we should recovery the status.
+     * */
+    bool recovery();
+
+    /**
+     * Build balance plan and save it in kvstore.
+     * */
+    Status buildBalancePlan();
+
+    std::vector<BalanceTask> genTasks(GraphSpaceID spaceId);
+
+    void getHostParts(GraphSpaceID spaceId,
+                      std::unordered_map<HostAddr, std::vector<PartitionID>>& hostParts,
+                      int32_t& totalParts);
+
+    void calDiff(const std::unordered_map<HostAddr, std::vector<PartitionID>>& hostParts,
+                 const std::vector<HostAddr>& activeHosts,
+                 std::vector<HostAddr>& newlyAdded,
+                 std::vector<HostAddr>& lost);
+
+    StatusOr<HostAddr> hostWithMinimalParts(
+                        const std::unordered_map<HostAddr, std::vector<PartitionID>>& hostParts,
+                        PartitionID partId);
+
+    void balanceParts(BalanceID balanceId,
+                      GraphSpaceID spaceId,
+                      std::unordered_map<HostAddr, std::vector<PartitionID>>& newHostParts,
+                      int32_t totalParts,
+                      std::vector<BalanceTask>& tasks);
+
+
+    std::vector<std::pair<HostAddr, int32_t>>
+    sortedHostsByParts(const std::unordered_map<HostAddr, std::vector<PartitionID>>& hostParts);
+
+private:
+    std::atomic_bool  running_{false};
+    kvstore::KVStore* kv_ = nullptr;
+    std::unique_ptr<AdminClient> client_{nullptr};
+    // Current running plan.
+    std::unique_ptr<BalancePlan> plan_{nullptr};
+    std::unique_ptr<folly::Executor> executor_;
+};
+
+}  // namespace meta
+}  // namespace nebula
+
+#endif  // META_ADMIN_BALANCERR_H_
diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp
new file mode 100644
index 00000000000..48f4661d9e9
--- /dev/null
+++ b/src/meta/test/BalancerTest.cpp
@@ -0,0 +1,548 @@
+/* Copyright (c) 2019 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+#include "base/Base.h"
+#include <gtest/gtest.h>
+#include <folly/executors/CPUThreadPoolExecutor.h>
+#include <folly/synchronization/Baton.h>
+#include "meta/processors/admin/Balancer.h"
+#include "meta/test/TestUtils.h"
+#include "fs/TempDir.h"
+#include "meta/processors/partsMan/CreateSpaceProcessor.h"
+
+namespace nebula {
+namespace meta {
+
+class TestFaultInjector : public FaultInjector {
+public:
+    explicit TestFaultInjector(std::vector<Status> sts)
+        : statusArray_(std::move(sts)) {
+        executor_.reset(new folly::CPUThreadPoolExecutor(1));
+    }
+
+    ~TestFaultInjector() {
+    }
+
+    folly::Future<Status> response(int index) {
+        folly::Promise<Status> pro;
+        auto f = pro.getFuture();
+        executor_->add([this, p = std::move(pro), index]() mutable {
+            p.setValue(this->statusArray_[index]);
+        });
+        return f;
+    }
+
+    folly::Future<Status> transLeader() override {
+        return response(0);
+    }
+
+    folly::Future<Status> addPart() override {
+        return response(1);
+    }
+
+    folly::Future<Status> addLearner() override {
+        return response(2);
+    }
+
+    folly::Future<Status> waitingForCatchUpData() override {
+        return response(3);
+    }
+
+    folly::Future<Status> memberChange() override {
+        return response(4);
+    }
+
+    folly::Future<Status> updateMeta() override {
+        return response(5);
+    }
+
+    folly::Future<Status> removePart() override {
+        return response(6);
+    }
+
+    void reset(std::vector<Status> sts) {
+        statusArray_ = std::move(sts);
+    }
+
+private:
+    std::vector<Status> statusArray_;
+    std::unique_ptr<folly::Executor> executor_;
+};
+
+TEST(BalanceTaskTest, SimpleTest) {
+    {
+        std::vector<Status> sts(7, Status::OK());
+        std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
+        auto client = std::make_unique<AdminClient>(std::move(injector));
+        BalanceTask task(0, 0, 0, HostAddr(0, 0), HostAddr(1, 1), nullptr, nullptr);
+        folly::Baton<true, std::atomic> b;
+        task.onFinished_ = [&]() {
+            LOG(INFO) << "Task finished!";
+            EXPECT_EQ(BalanceTask::Result::SUCCEEDED, task.ret_);
+            EXPECT_EQ(BalanceTask::Status::END, task.status_);
+            b.post();
+        };
+        task.onError_ = []() {
+            LOG(FATAL) << "We should not reach here!";
+        };
+        task.client_ = client.get();
+        task.invoke();
+        b.wait();
+    }
+    {
+        std::vector<Status> sts{Status::Error("transLeader failed!"),
+                                Status::OK(),
+                                Status::OK(),
+                                Status::OK(),
+                                Status::OK(),
+                                Status::OK(),
+                                Status::OK()};
+        std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
+        auto client = std::make_unique<AdminClient>(std::move(injector));
+        BalanceTask task(0, 0, 0, HostAddr(0, 0), HostAddr(1, 1), nullptr, nullptr);
+        folly::Baton<true, std::atomic> b;
+        task.onFinished_ = []() {
+            LOG(FATAL) << "We should not reach here!";
+        };
+        task.onError_ = [&]() {
+            LOG(INFO) << "Error happens!";
+            EXPECT_EQ(BalanceTask::Result::FAILED, task.ret_);
+            EXPECT_EQ(BalanceTask::Status::CHANGE_LEADER, task.status_);
+            b.post();
+        };
+        task.client_ = client.get();
+        task.invoke();
+        b.wait();
+    }
+    LOG(INFO) << "Test finished!";
+}
+
+TEST(BalanceTest, BalancePartsTest) {
+    auto* balancer = Balancer::instance(nullptr);
+    auto dump = [](const std::unordered_map<HostAddr, std::vector<PartitionID>>& hostParts,
+                   const std::vector<BalanceTask>& tasks) {
+        for (auto it = hostParts.begin(); it != hostParts.end(); it++) {
+            std::stringstream ss;
+            ss << it->first << ":";
+            for (auto partId : it->second) {
+                ss << partId << ",";
+            }
+            VLOG(1) << ss.str();
+        }
+        for (auto& task : tasks) {
+            VLOG(1) << task.taskIdStr();
+        }
+    };
+    {
+        std::unordered_map<HostAddr, std::vector<PartitionID>> hostParts;
+        hostParts.emplace(HostAddr(0, 0), std::vector<PartitionID>{1, 2, 3, 4});
+        hostParts.emplace(HostAddr(1, 0), std::vector<PartitionID>{1, 2, 3, 4});
+        hostParts.emplace(HostAddr(2, 0), std::vector<PartitionID>{1, 2, 3, 4});
+        hostParts.emplace(HostAddr(3, 0), std::vector<PartitionID>{});
+        int32_t totalParts = 12;
+        std::vector<BalanceTask> tasks;
+        VLOG(1) << "=== original map ====";
+        dump(hostParts, tasks);
+        balancer->balanceParts(0, 0, hostParts, totalParts, tasks);
+        VLOG(1) << "=== new map ====";
+        dump(hostParts, tasks);
+        for (auto it = hostParts.begin(); it != hostParts.end(); it++) {
+            EXPECT_EQ(3, it->second.size());
+        }
+        EXPECT_EQ(3, tasks.size());
+    }
+    {
+        std::unordered_map<HostAddr, std::vector<PartitionID>> hostParts;
+        hostParts.emplace(HostAddr(0, 0), std::vector<PartitionID>{1, 2, 3, 4, 5});
+        hostParts.emplace(HostAddr(1, 0), std::vector<PartitionID>{1, 2, 4, 5});
+        hostParts.emplace(HostAddr(2, 0), std::vector<PartitionID>{2, 3, 4, 5});
+        hostParts.emplace(HostAddr(3, 0), std::vector<PartitionID>{1, 3});
+        int32_t totalParts = 15;
+        std::vector<BalanceTask> tasks;
+        VLOG(1) << "=== original map ====";
+        dump(hostParts, tasks);
+        balancer->balanceParts(0, 0, hostParts, totalParts, tasks);
+        VLOG(1) << "=== new map ====";
+        dump(hostParts, tasks);
+        EXPECT_EQ(4, hostParts[HostAddr(0, 0)].size());
+        EXPECT_EQ(4, hostParts[HostAddr(1, 0)].size());
+        EXPECT_EQ(4, hostParts[HostAddr(2, 0)].size());
+        EXPECT_EQ(3, hostParts[HostAddr(3, 0)].size());
+        EXPECT_EQ(1, tasks.size());
+    }
+    {
+        std::unordered_map<HostAddr, std::vector<PartitionID>> hostParts;
+        hostParts.emplace(HostAddr(0, 0), std::vector<PartitionID>{1, 2, 3, 4});
+        hostParts.emplace(HostAddr(1, 0), std::vector<PartitionID>{1, 2, 4, 5});
+        hostParts.emplace(HostAddr(2, 0), std::vector<PartitionID>{2, 3, 4, 5});
+        hostParts.emplace(HostAddr(3, 0), std::vector<PartitionID>{1, 3, 5});
+        int32_t totalParts = 15;
+        std::vector<BalanceTask> tasks;
+        VLOG(1) << "=== original map ====";
+        dump(hostParts, tasks);
+        balancer->balanceParts(0, 0, hostParts, totalParts, tasks);
+        VLOG(1) << "=== new map ====";
+        dump(hostParts, tasks);
+        EXPECT_EQ(4, hostParts[HostAddr(0, 0)].size());
+        EXPECT_EQ(4, hostParts[HostAddr(1, 0)].size());
+        EXPECT_EQ(4, hostParts[HostAddr(2, 0)].size());
+        EXPECT_EQ(3, hostParts[HostAddr(3, 0)].size());
+        EXPECT_EQ(0, tasks.size());
+    }
+    {
+        std::unordered_map<HostAddr, std::vector<PartitionID>> hostParts;
+        hostParts.emplace(HostAddr(0, 0), std::vector<PartitionID>{1, 2, 3, 4, 5, 6, 7, 8, 9});
+        hostParts.emplace(HostAddr(1, 0), std::vector<PartitionID>{1, 2, 3, 4, 5, 6, 7, 8, 9});
+        hostParts.emplace(HostAddr(2, 0), std::vector<PartitionID>{1, 2, 3, 4, 5, 6, 7, 8, 9});
+        hostParts.emplace(HostAddr(3, 0), std::vector<PartitionID>{});
+        hostParts.emplace(HostAddr(4, 0), std::vector<PartitionID>{});
+        hostParts.emplace(HostAddr(5, 0), std::vector<PartitionID>{});
+        hostParts.emplace(HostAddr(6, 0), std::vector<PartitionID>{});
+        hostParts.emplace(HostAddr(7, 0), std::vector<PartitionID>{});
+        hostParts.emplace(HostAddr(8, 0), std::vector<PartitionID>{});
+        int32_t totalParts = 27;
+        std::vector<BalanceTask> tasks;
+        VLOG(1) << "=== original map ====";
+        dump(hostParts, tasks);
+        balancer->balanceParts(0, 0, hostParts, totalParts, tasks);
+        VLOG(1) << "=== new map ====";
+        dump(hostParts, tasks);
+        for (auto it = hostParts.begin(); it != hostParts.end(); it++) {
+            EXPECT_EQ(3, it->second.size());
+        }
+        EXPECT_EQ(18, tasks.size());
+    }
+    {
+        std::unordered_map<HostAddr, std::vector<PartitionID>> hostParts;
+        hostParts.emplace(HostAddr(0, 0), std::vector<PartitionID>{1, 2, 3, 4, 5, 6, 7, 8, 9});
+        hostParts.emplace(HostAddr(1, 0), std::vector<PartitionID>{1, 2, 3, 4, 5, 6, 7, 8, 9});
+        hostParts.emplace(HostAddr(2, 0), std::vector<PartitionID>{1, 2, 3, 4, 5, 6, 7, 8, 9});
+        hostParts.emplace(HostAddr(3, 0), std::vector<PartitionID>{});
+        hostParts.emplace(HostAddr(4, 0), std::vector<PartitionID>{});
+        hostParts.emplace(HostAddr(5, 0), std::vector<PartitionID>{});
+        hostParts.emplace(HostAddr(6, 0), std::vector<PartitionID>{});
+        hostParts.emplace(HostAddr(7, 0), std::vector<PartitionID>{});
+        int32_t totalParts = 27;
+        std::vector<BalanceTask> tasks;
+        VLOG(1) << "=== original map ====";
+        dump(hostParts, tasks);
+        balancer->balanceParts(0, 0, hostParts, totalParts, tasks);
+        VLOG(1) << "=== new map ====";
+        dump(hostParts, tasks);
+        for (auto it = hostParts.begin(); it != hostParts.end(); it++) {
+            EXPECT_GE(4, it->second.size());
+            EXPECT_LE(3, it->second.size());
+        }
+    }
+}
+
+TEST(BalanceTest, BalancePlanTest) {
+    {
+        LOG(INFO) << "Test with all tasks succeeded!";
+        BalancePlan plan(0L, nullptr, nullptr);
+        std::vector<Status> sts(7, Status::OK());
+        std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
+        auto client = std::make_unique<AdminClient>(std::move(injector));
+
+        for (int i = 0; i < 10; i++) {
+            BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr);
+            task.client_ = client.get();
+            plan.addTask(std::move(task));
+        }
+        folly::Baton<true, std::atomic> b;
+        plan.onFinished_ = [&plan, &b] () {
+            ASSERT_EQ(BalancePlan::Status::SUCCEEDED, plan.status_);
+            ASSERT_EQ(10, plan.finishedTaskNum_);
+            b.post();
+        };
+        plan.registerTaskCb();
+        plan.invoke();
+        b.wait();
+    }
+    {
+        LOG(INFO) << "Test with one task failed!";
+        BalancePlan plan(0L, nullptr, nullptr);
+        {
+            std::vector<Status> sts(7, Status::OK());
+            std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
+            auto client = std::make_unique<AdminClient>(std::move(injector));
+            for (int i = 0; i < 9; i++) {
+                BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr);
+                task.client_ = client.get();
+                plan.addTask(std::move(task));
+            }
+        }
+        {
+            std::vector<Status> sts {
+                                Status::Error("transLeader failed!"),
+                                Status::OK(),
+                                Status::OK(),
+                                Status::OK(),
+                                Status::OK(),
+                                Status::OK(),
+                                Status::OK()};
+            std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
+            auto client = std::make_unique<AdminClient>(std::move(injector));
+            BalanceTask task(0, 0, 0, HostAddr(10, 0), HostAddr(10, 1), nullptr, nullptr);
+            task.client_ = client.get();
+            plan.addTask(std::move(task));
+        }
+        folly::Baton<true, std::atomic> b;
+        plan.onFinished_ = [&plan, &b] () {
+            ASSERT_EQ(BalancePlan::Status::SUCCEEDED, plan.status_);
+            ASSERT_EQ(10, plan.finishedTaskNum_);
+            b.post();
+        };
+        plan.registerTaskCb();
+        plan.invoke();
+        b.wait();
+    }
+}
+
+TEST(BalanceTest, NormalTest) {
+    fs::TempDir rootPath("/tmp/BalanceTest.XXXXXX");
+    std::unique_ptr<kvstore::KVStore> kv(TestUtils::initKV(rootPath.path()));
+    FLAGS_expired_hosts_check_interval_sec = 1;
+    FLAGS_expired_threshold_sec = 1;
+    TestUtils::createSomeHosts(kv.get());
+    {
+        cpp2::SpaceProperties properties;
+        properties.set_space_name("default_space");
+        properties.set_partition_num(8);
+        properties.set_replica_factor(3);
+        cpp2::CreateSpaceReq req;
+        req.set_properties(std::move(properties));
+        auto* processor = CreateSpaceProcessor::instance(kv.get());
+        auto f = processor->getFuture();
+        processor->process(req);
+        auto resp = std::move(f).get();
+        ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code);
+        ASSERT_EQ(1, resp.get_id().get_space_id());
+    }
+    std::vector<Status> sts(7, Status::OK());
+    std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
+    auto client = std::make_unique<AdminClient>(std::move(injector));
+    Balancer balancer(kv.get(), std::move(client));
+    auto ret = balancer.balance();
+    CHECK_EQ(Status::Error("No tasks"), ret.status());
+
+    sleep(1);
+    LOG(INFO) << "Now, we lost host " << HostAddr(3, 3);
+    TestUtils::registerHB({{0, 0}, {1, 1}, {2, 2}});
+    ret = balancer.balance();
+    CHECK(ret.ok());
+    auto balanceId = ret.value();
+    sleep(1);
+    LOG(INFO) << "Rebalance finished!";
+    {
+        const auto& prefix = BalancePlan::prefix();
+        std::unique_ptr<kvstore::KVIterator> iter;
+        auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
+        ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED);
+        int num = 0;
+        while (iter->valid()) {
+            auto id = BalancePlan::id(iter->key());
+            auto status = BalancePlan::status(iter->val());
+            ASSERT_EQ(balanceId, id);
+            ASSERT_EQ(BalancePlan::Status::SUCCEEDED, status);
+            num++;
+            iter->next();
+        }
+        ASSERT_EQ(1, num);
+    }
+    {
+        const auto& prefix = BalanceTask::prefix(balanceId);
+        std::unique_ptr<kvstore::KVIterator> iter;
+        auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
+        ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED);
+        int32_t num = 0;
+        while (iter->valid()) {
+            BalanceTask task;
+            {
+                auto tup = BalanceTask::parseKey(iter->key());
+                task.balanceId_ = std::get<0>(tup);
+                ASSERT_EQ(balanceId, task.balanceId_);
+                task.spaceId_ = std::get<1>(tup);
+                ASSERT_EQ(1, task.spaceId_);
+                task.src_ = std::get<3>(tup);
+                ASSERT_EQ(HostAddr(3, 3), task.src_);
+            }
+            {
+                auto tup = BalanceTask::parseVal(iter->val());
+                task.status_ = std::get<0>(tup);
+                ASSERT_EQ(BalanceTask::Status::END, task.status_);
+                task.ret_ = std::get<1>(tup);
+                ASSERT_EQ(BalanceTask::Result::SUCCEEDED, task.ret_);
+                task.startTimeMs_ = std::get<2>(tup);
+                ASSERT_GT(task.startTimeMs_, 0);
+                task.endTimeMs_ = std::get<3>(tup);
+                ASSERT_GT(task.endTimeMs_, 0);
+            }
+            num++;
+            iter->next();
+        }
+        ASSERT_EQ(6, num);
+    }
+}
+
+TEST(BalanceTest, RecoveryTest) {
+    fs::TempDir rootPath("/tmp/BalanceTest.XXXXXX");
+    std::unique_ptr<kvstore::KVStore> kv(TestUtils::initKV(rootPath.path()));
+    FLAGS_expired_hosts_check_interval_sec = 1;
+    FLAGS_expired_threshold_sec = 1;
+    TestUtils::createSomeHosts(kv.get());
+    {
+        cpp2::SpaceProperties properties;
+        properties.set_space_name("default_space");
+        properties.set_partition_num(8);
+        properties.set_replica_factor(3);
+        cpp2::CreateSpaceReq req;
+        req.set_properties(std::move(properties));
+        auto* processor = CreateSpaceProcessor::instance(kv.get());
+        auto f = processor->getFuture();
+        processor->process(req);
+        auto resp = std::move(f).get();
+        ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code);
+        ASSERT_EQ(1, resp.get_id().get_space_id());
+    }
+
+    sleep(1);
+    LOG(INFO) << "Now, we lost host " << HostAddr(3, 3);
+    TestUtils::registerHB({{0, 0}, {1, 1}, {2, 2}});
+    std::vector<Status> sts {
+                                Status::OK(),
+                                Status::OK(),
+                                Status::OK(),
+                                Status::Error("catch up data failed!"),
+                                Status::OK(),
+                                Status::OK(),
+                                Status::OK()};
+
+    std::unique_ptr<FaultInjector> injector(new TestFaultInjector(std::move(sts)));
+    auto client = std::make_unique<AdminClient>(std::move(injector));
+    Balancer balancer(kv.get(), std::move(client));
+    auto ret = balancer.balance();
+    CHECK(ret.ok());
+    auto balanceId = ret.value();
+    sleep(1);
+    {
+        const auto& prefix = BalancePlan::prefix();
+        std::unique_ptr<kvstore::KVIterator> iter;
+        auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
+        ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED);
+        int num = 0;
+        while (iter->valid()) {
+            auto id = BalancePlan::id(iter->key());
+            auto status = BalancePlan::status(iter->val());
+            ASSERT_EQ(balanceId, id);
+            ASSERT_EQ(BalancePlan::Status::FAILED, status);
+            num++;
+            iter->next();
+        }
+        ASSERT_EQ(1, num);
+    }
+    {
+        const auto& prefix = BalanceTask::prefix(balanceId);
+        std::unique_ptr<kvstore::KVIterator> iter;
+        auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
+        ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED);
+        int32_t num = 0;
+        while (iter->valid()) {
+            BalanceTask task;
+            {
+                auto tup = BalanceTask::parseKey(iter->key());
+                task.balanceId_ = std::get<0>(tup);
+                ASSERT_EQ(balanceId, task.balanceId_);
+                task.spaceId_ = std::get<1>(tup);
+                ASSERT_EQ(1, task.spaceId_);
+                task.src_ = std::get<3>(tup);
+                ASSERT_EQ(HostAddr(3, 3), task.src_);
+            }
+            {
+                auto tup = BalanceTask::parseVal(iter->val());
+                task.status_ = std::get<0>(tup);
+                ASSERT_EQ(BalanceTask::Status::CATCH_UP_DATA, task.status_);
+                task.ret_ = std::get<1>(tup);
+                ASSERT_EQ(BalanceTask::Result::FAILED, task.ret_);
+                task.startTimeMs_ = std::get<2>(tup);
+                ASSERT_GT(task.startTimeMs_, 0);
+                task.endTimeMs_ = std::get<3>(tup);
+                ASSERT_GT(task.endTimeMs_, 0);
+            }
+            num++;
+            iter->next();
+        }
+        ASSERT_EQ(6, num);
+    }
+    LOG(INFO) << "Now let's recovery it.";
+    std::vector<Status> normalSts(7, Status::OK());
+    static_cast<TestFaultInjector*>(balancer.client_->faultInjector())->reset(std::move(normalSts));
+    ret = balancer.balance();
+    CHECK(ret.ok());
+    balanceId = ret.value();
+    sleep(1);
+    {
+        const auto& prefix = BalancePlan::prefix();
+        std::unique_ptr<kvstore::KVIterator> iter;
+        auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
+        ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED);
+        int num = 0;
+        while (iter->valid()) {
+            auto id = BalancePlan::id(iter->key());
+            auto status = BalancePlan::status(iter->val());
+            ASSERT_EQ(balanceId, id);
+            ASSERT_EQ(BalancePlan::Status::SUCCEEDED, status);
+            num++;
+            iter->next();
+        }
+        ASSERT_EQ(1, num);
+    }
+    {
+        const auto& prefix = BalanceTask::prefix(balanceId);
+        std::unique_ptr<kvstore::KVIterator> iter;
+        auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
+        ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED);
+        int32_t num = 0;
+        while (iter->valid()) {
+            BalanceTask task;
+            {
+                auto tup = BalanceTask::parseKey(iter->key());
+                task.balanceId_ = std::get<0>(tup);
+                ASSERT_EQ(balanceId, task.balanceId_);
+                task.spaceId_ = std::get<1>(tup);
+                ASSERT_EQ(1, task.spaceId_);
+                task.src_ = std::get<3>(tup);
+                ASSERT_EQ(HostAddr(3, 3), task.src_);
+            }
+            {
+                auto tup = BalanceTask::parseVal(iter->val());
+                task.status_ = std::get<0>(tup);
+                ASSERT_EQ(BalanceTask::Status::END, task.status_);
+                task.ret_ = std::get<1>(tup);
+                ASSERT_EQ(BalanceTask::Result::SUCCEEDED, task.ret_);
+                task.startTimeMs_ = std::get<2>(tup);
+                ASSERT_GT(task.startTimeMs_, 0);
+                task.endTimeMs_ = std::get<3>(tup);
+                ASSERT_GT(task.endTimeMs_, 0);
+            }
+            num++;
+            iter->next();
+        }
+        ASSERT_EQ(6, num);
+    }
+}
+
+}  // namespace meta
+}  // namespace nebula
+
+
+int main(int argc, char** argv) {
+    testing::InitGoogleTest(&argc, argv);
+    folly::init(&argc, &argv, true);
+    google::SetStderrLogging(google::INFO);
+    return RUN_ALL_TESTS();
+}
+
+
diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt
index 77061e4f336..2d040d1cc80 100644
--- a/src/meta/test/CMakeLists.txt
+++ b/src/meta/test/CMakeLists.txt
@@ -137,7 +137,6 @@ nebula_link_libraries(
 )
 nebula_add_test(active_hosts_man_test)
 
-
 add_executable(
     meta_http_test
     MetaHttpHandlerTest.cpp
@@ -171,6 +170,35 @@ nebula_link_libraries(
     gtest
 )
 nebula_add_test(meta_http_test)
+
+add_executable(
+    balancer_test
+    BalancerTest.cpp
+    $<TARGET_OBJECTS:schema_obj>
+    $<TARGET_OBJECTS:meta_client>
+    $<TARGET_OBJECTS:meta_service_handler>
+    $<TARGET_OBJECTS:kvstore_obj>
+    $<TARGET_OBJECTS:meta_thrift_obj>
+    $<TARGET_OBJECTS:common_thrift_obj>
+    $<TARGET_OBJECTS:raftex_obj>
+    $<TARGET_OBJECTS:raftex_thrift_obj>
+    $<TARGET_OBJECTS:wal_obj>
+    $<TARGET_OBJECTS:thrift_obj>
+    $<TARGET_OBJECTS:base_obj>
+    $<TARGET_OBJECTS:thread_obj>
+    $<TARGET_OBJECTS:time_obj>
+    $<TARGET_OBJECTS:fs_obj>
+    $<TARGET_OBJECTS:network_obj>
+)
+nebula_link_libraries(
+    balancer_test
+    ${ROCKSDB_LIBRARIES}
+    ${THRIFT_LIBRARIES}
+    wangle
+    gtest
+)
+nebula_add_test(balancer_test)
+
 add_executable(
     authentication_test
     AuthProcessorTest.cpp
@@ -192,12 +220,11 @@ add_executable(
 )
 nebula_link_libraries(
     authentication_test
-    proxygenhttpserver
-    proxygenlib
     ${ROCKSDB_LIBRARIES}
     ${THRIFT_LIBRARIES}
     wangle
     gtest
 )
+
 nebula_add_test(authentication_test)