From a5485c693079967e8f23300fa5f527229c8ade2c Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Fri, 8 Nov 2019 13:23:01 +0800 Subject: [PATCH 1/3] balance stop --- src/graph/BalanceExecutor.cpp | 7 +- src/graph/BalanceExecutor.h | 4 +- src/interface/meta.thrift | 2 + src/kvstore/Part.cpp | 20 +- src/kvstore/raftex/RaftPart.cpp | 4 +- src/meta/client/MetaClient.cpp | 7 +- src/meta/client/MetaClient.h | 2 +- src/meta/processors/admin/BalancePlan.cpp | 11 + src/meta/processors/admin/BalancePlan.h | 3 + .../processors/admin/BalanceProcessor.cpp | 16 ++ src/meta/processors/admin/BalanceTask.h | 8 + src/meta/processors/admin/Balancer.cpp | 72 ++++--- src/meta/processors/admin/Balancer.h | 11 +- src/meta/test/BalancerTest.cpp | 193 ++++++++++++++++++ src/parser/AdminSentences.h | 1 + src/parser/parser.yy | 5 +- src/parser/scanner.lex | 2 + src/parser/test/ParserTest.cpp | 18 ++ 18 files changed, 348 insertions(+), 38 deletions(-) diff --git a/src/graph/BalanceExecutor.cpp b/src/graph/BalanceExecutor.cpp index fd5c25a53c4..d0a0b713dff 100644 --- a/src/graph/BalanceExecutor.cpp +++ b/src/graph/BalanceExecutor.cpp @@ -27,6 +27,9 @@ void BalanceExecutor::execute() { case BalanceSentence::SubType::kData: balanceData(); break; + case BalanceSentence::SubType::kDataStop: + balanceData(true); + break; case BalanceSentence::SubType::kShowBalancePlan: showBalancePlan(); break; @@ -66,8 +69,8 @@ void BalanceExecutor::balanceLeader() { std::move(future).via(runner).thenValue(cb).thenError(error); } -void BalanceExecutor::balanceData() { - auto future = ectx()->getMetaClient()->balance(); +void BalanceExecutor::balanceData(bool isStop) { + auto future = ectx()->getMetaClient()->balance(isStop); auto *runner = ectx()->rctx()->runner(); auto cb = [this] (auto &&resp) { diff --git a/src/graph/BalanceExecutor.h b/src/graph/BalanceExecutor.h index e9080e3b886..27c345b986e 100644 --- a/src/graph/BalanceExecutor.h +++ b/src/graph/BalanceExecutor.h @@ -27,7 +27,9 @@ class BalanceExecutor final : public Executor { void balanceLeader(); - void balanceData(); + void balanceData(bool isStop = false); + + void stopBalanceData(); void showBalancePlan(); diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 0fd5de72f31..ade6026f9ed 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -36,6 +36,7 @@ enum ErrorCode { E_STORE_SEGMENT_ILLEGAL = -32, E_BAD_BALANCE_PLAN = -33, E_BALANCED = -34, + E_NO_RUNNING_BALANCE_PLAN = -35, E_INVALID_PASSWORD = -41, E_INPROPER_ROLE = -42, @@ -443,6 +444,7 @@ struct BalanceReq { 1: optional common.GraphSpaceID space_id, // Specify the balance id to check the status of the related balance plan 2: optional i64 id, + 3: optional bool stop, } enum TaskResult { diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index a43a326b0bd..7a492d15294 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -152,7 +152,9 @@ void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) { void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) { std::string log = encodeHost(OP_ADD_LEARNER, learner); sendCommandAsync(std::move(log)) - .thenValue([callback = std::move(cb)] (AppendLogResult res) mutable { + .thenValue([callback = std::move(cb), learner, this] (AppendLogResult res) mutable { + LOG(INFO) << idStr_ << "add learner " << learner + << ", result: " << static_cast(toResultCode(res)); callback(toResultCode(res)); }); } @@ -160,7 +162,9 @@ void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) { void Part::asyncTransferLeader(const HostAddr& target, KVCallback cb) { std::string log = encodeHost(OP_TRANS_LEADER, target); sendCommandAsync(std::move(log)) - .thenValue([callback = std::move(cb)] (AppendLogResult res) mutable { + .thenValue([callback = std::move(cb), target, this] (AppendLogResult res) mutable { + LOG(INFO) << idStr_ << "transfer leader to " << target + << ", result: " << static_cast(toResultCode(res)); callback(toResultCode(res)); }); } @@ -168,7 +172,9 @@ void Part::asyncTransferLeader(const HostAddr& target, KVCallback cb) { void Part::asyncAddPeer(const HostAddr& peer, KVCallback cb) { std::string log = encodeHost(OP_ADD_PEER, peer); sendCommandAsync(std::move(log)) - .thenValue([callback = std::move(cb)] (AppendLogResult res) mutable { + .thenValue([callback = std::move(cb), peer, this] (AppendLogResult res) mutable { + LOG(INFO) << idStr_ << "add peer " << peer + << ", result: " << static_cast(toResultCode(res)); callback(toResultCode(res)); }); } @@ -176,7 +182,9 @@ void Part::asyncAddPeer(const HostAddr& peer, KVCallback cb) { void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) { std::string log = encodeHost(OP_REMOVE_PEER, peer); sendCommandAsync(std::move(log)) - .thenValue([callback = std::move(cb)] (AppendLogResult res) mutable { + .thenValue([callback = std::move(cb), peer, this] (AppendLogResult res) mutable { + LOG(INFO) << idStr_ << "remove peer " << peer + << ", result: " << static_cast(toResultCode(res)); callback(toResultCode(res)); }); } @@ -360,6 +368,7 @@ bool Part::preProcessLog(LogID logId, auto learner = decodeHost(OP_ADD_LEARNER, log); auto ts = getTimestamp(log); if (ts > startTimeMs_) { + LOG(INFO) << idStr_ << "preprocess add learner " << learner; addLearner(learner); } else { LOG(INFO) << idStr_ << "Skip stale add learner " << learner; @@ -370,6 +379,7 @@ bool Part::preProcessLog(LogID logId, auto newLeader = decodeHost(OP_TRANS_LEADER, log); auto ts = getTimestamp(log); if (ts > startTimeMs_) { + LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader; preProcessTransLeader(newLeader); } else { LOG(INFO) << idStr_ << "Skip stale transfer leader " << newLeader; @@ -380,6 +390,7 @@ bool Part::preProcessLog(LogID logId, auto peer = decodeHost(OP_ADD_PEER, log); auto ts = getTimestamp(log); if (ts > startTimeMs_) { + LOG(INFO) << idStr_ << "preprocess add peer " << peer; addPeer(peer); } else { LOG(INFO) << idStr_ << "Skip stale add peer " << peer; @@ -390,6 +401,7 @@ bool Part::preProcessLog(LogID logId, auto peer = decodeHost(OP_REMOVE_PEER, log); auto ts = getTimestamp(log); if (ts > startTimeMs_) { + LOG(INFO) << idStr_ << "preprocess remove peer " << peer; preProcessRemovePeer(peer); } else { LOG(INFO) << idStr_ << "Skip stale remove peer " << peer; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 29760b70fe2..ae78ff2a70e 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1377,7 +1377,8 @@ void RaftPart::processAppendLogRequest( lastMsgRecvDur_.reset(); if (req.get_sending_snapshot() && status_ != Status::WAITING_SNAPSHOT) { - LOG(INFO) << idStr_ << "Begin to wait for the snapshot"; + LOG(INFO) << idStr_ << "Begin to wait for the snapshot" + << " " << req.get_committed_log_id(); reset(); status_ = Status::WAITING_SNAPSHOT; resp.set_error_code(cpp2::ErrorCode::E_WAITING_SNAPSHOT); @@ -1709,6 +1710,7 @@ void RaftPart::reset() { AppendLogResult RaftPart::isCatchedUp(const HostAddr& peer) { std::lock_guard lck(logsLock_); + LOG(INFO) << idStr_ << "Check whether I catch up"; if (role_ != Role::LEADER) { LOG(INFO) << idStr_ << "I am not the leader"; return AppendLogResult::E_NOT_A_LEADER; diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index c1f53d65e1f..ca35c99eb98 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -416,6 +416,8 @@ Status MetaClient::handleResponse(const RESP& resp) { return Status::Error("The balancer is running!"); case cpp2::ErrorCode::E_BAD_BALANCE_PLAN: return Status::Error("Bad balance plan!"); + case cpp2::ErrorCode::E_NO_RUNNING_BALANCE_PLAN: + return Status::Error("No running balance plan!"); } default: return Status::Error("Unknown code %d", static_cast(resp.get_code())); @@ -1199,8 +1201,11 @@ folly::Future> MetaClient::heartbeat() { return future; } -folly::Future> MetaClient::balance() { +folly::Future> MetaClient::balance(bool isStop) { cpp2::BalanceReq req; + if (isStop) { + req.set_stop(isStop); + } folly::Promise> promise; auto future = promise.getFuture(); getResponse(std::move(req), [] (auto client, auto request) { diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index 5d051b0cf95..0720ab95fa1 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -217,7 +217,7 @@ class MetaClient { // Operations for admin folly::Future> - balance(); + balance(bool isStop = false); folly::Future>> showBalance(int64_t balanceId); diff --git a/src/meta/processors/admin/BalancePlan.cpp b/src/meta/processors/admin/BalancePlan.cpp index db6e789f5bd..e0a9e8ad632 100644 --- a/src/meta/processors/admin/BalancePlan.cpp +++ b/src/meta/processors/admin/BalancePlan.cpp @@ -181,6 +181,17 @@ bool BalancePlan::recovery(bool resume) { return true; } +void BalancePlan::stop() { + { + std::lock_guard lg(lock_); + for (auto& task : tasks_) { + task.markInvalidIfNotStarted(); + } + status_ = BalancePlan::Status::FAILED; + saveInStore(); + } +} + std::string BalancePlan::planKey() const { std::string str; str.reserve(48); diff --git a/src/meta/processors/admin/BalancePlan.h b/src/meta/processors/admin/BalancePlan.h index cc54b4a273a..0a87be42574 100644 --- a/src/meta/processors/admin/BalancePlan.h +++ b/src/meta/processors/admin/BalancePlan.h @@ -20,6 +20,7 @@ class BalancePlan { FRIEND_TEST(BalanceTest, NormalTest); FRIEND_TEST(BalanceTest, RecoveryTest); FRIEND_TEST(BalanceTest, DispatchTasksTest); + FRIEND_TEST(BalanceTest, StopBalanceDataTest); public: enum class Status : uint8_t { @@ -75,6 +76,8 @@ class BalancePlan { return tasks_; } + void stop(); + private: bool recovery(bool resume = true); diff --git a/src/meta/processors/admin/BalanceProcessor.cpp b/src/meta/processors/admin/BalanceProcessor.cpp index 54e1e1d9c8e..36fde6236e1 100644 --- a/src/meta/processors/admin/BalanceProcessor.cpp +++ b/src/meta/processors/admin/BalanceProcessor.cpp @@ -18,6 +18,22 @@ void BalanceProcessor::process(const cpp2::BalanceReq& req) { onFinished(); return; } + if (req.get_stop() != nullptr) { + if (!(*req.get_stop())) { + resp_.set_code(cpp2::ErrorCode::E_UNKNOWN); + onFinished(); + return; + } + auto ret = Balancer::instance(kvstore_)->stop(); + if (!ret.ok()) { + resp_.set_code(cpp2::ErrorCode::E_NO_RUNNING_BALANCE_PLAN); + onFinished(); + return; + } + resp_.set_code(cpp2::ErrorCode::SUCCEEDED); + resp_.set_id(ret.value()); + return; + } if (req.get_id() != nullptr) { auto ret = Balancer::instance(kvstore_)->show(*req.get_id()); if (!ret.ok()) { diff --git a/src/meta/processors/admin/BalanceTask.h b/src/meta/processors/admin/BalanceTask.h index 692fc5a7cb7..7c5da76f807 100644 --- a/src/meta/processors/admin/BalanceTask.h +++ b/src/meta/processors/admin/BalanceTask.h @@ -24,6 +24,7 @@ class BalanceTask { FRIEND_TEST(BalanceTest, BalancePlanTest); FRIEND_TEST(BalanceTest, NormalTest); FRIEND_TEST(BalanceTest, RecoveryTest); + FRIEND_TEST(BalanceTest, StopBalanceDataTest); public: enum class Status : uint8_t { @@ -73,6 +74,13 @@ class BalanceTask { void rollback(); + void markInvalidIfNotStarted() { + if (status_ == Status::START) { + LOG(INFO) << "mark invalid of " << taskIdStr_; + ret_ = Result::INVALID; + } + } + Result result() const { return ret_; } diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp index e784af42c39..f6427dc9e5d 100644 --- a/src/meta/processors/admin/Balancer.cpp +++ b/src/meta/processors/admin/Balancer.cpp @@ -19,8 +19,8 @@ namespace nebula { namespace meta { StatusOr Balancer::balance() { - bool expected = false; - if (running_.compare_exchange_strong(expected, true)) { + std::lock_guard lg(lock_); + if (!running_) { if (!recovery()) { LOG(ERROR) << "Recovery balancer failed!"; return Status::Error("Can't do balance because there is one corruptted balance plan!"); @@ -34,6 +34,7 @@ StatusOr Balancer::balance() { } } executor_->add(std::bind(&BalancePlan::invoke, plan_.get())); + running_ = true; return plan_->id(); } CHECK(!!plan_); @@ -51,6 +52,16 @@ StatusOr Balancer::show(BalanceID id) const { return Status::Error("KV is nullptr"); } +StatusOr Balancer::stop() { + std::lock_guard lg(lock_); + if (!running_) { + return Status::Error("No running balance plan"); + } + CHECK(!!plan_); + plan_->stop(); + return plan_->id(); +} + bool Balancer::recovery() { CHECK(!plan_) << "plan should be nullptr now"; if (kv_) { @@ -78,14 +89,18 @@ bool Balancer::recovery() { CHECK_EQ(1, corruptedPlans.size()); plan_ = std::make_unique(corruptedPlans[0], kv_, client_.get()); plan_->onFinished_ = [this] () { - bool expected = true; - auto &running = this->running_; // Get the reference before the captured `this' lost - plan_.reset(); - CHECK(running.compare_exchange_strong(expected, false)); + // Get the reference before the captured `this' lost + auto &lock = this->lock_; + { + std::lock_guard lg(lock); + running_ = false; + plan_.reset(); + } }; if (!plan_->recovery()) { LOG(ERROR) << "Can't recovery plan " << corruptedPlans[0]; - plan_->onFinished_(); + running_ = false; + plan_.reset(); return false; } } @@ -99,7 +114,6 @@ bool Balancer::getAllSpaces(std::vector& spaces, kvstore::ResultCo std::unique_ptr iter; auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { - running_ = false; retCode = ret; return false; } @@ -116,6 +130,7 @@ Status Balancer::buildBalancePlan() { std::vector spaces; kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED; if (!getAllSpaces(spaces, ret)) { + running_ = false; return Status::Error("Can't access kvstore, ret = %d", static_cast(ret)); } plan_ = std::make_unique(time::WallClock::fastNowInSec(), kv_, client_.get()); @@ -126,17 +141,22 @@ Status Balancer::buildBalancePlan() { } } plan_->onFinished_ = [this] () { - bool expected = true; - auto &running = this->running_; // Get the reference before the captured `this' lost - plan_.reset(); - CHECK(running.compare_exchange_strong(expected, false)); + // Get the reference before the captured `this' lost + auto &lock = this->lock_; + { + std::lock_guard lg(lock); + running_ = false; + plan_.reset(); + } }; if (plan_->tasks_.empty()) { - plan_->onFinished_(); + running_ = false; + plan_.reset(); return Status::Balanced(); } if (!plan_->saveInStore()) { - plan_->onFinished_(); + running_ = false; + plan_.reset(); return Status::Error("Can't persist the plan"); } return Status::OK(); @@ -368,8 +388,7 @@ cpp2::ErrorCode Balancer::leaderBalance() { std::vector spaces; kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED; if (!getAllSpaces(spaces, ret)) { - LOG(ERROR) << "Can't access kvstore, ret = d" - << static_cast(ret); + LOG(ERROR) << "Can't access kvstore, ret = " << static_cast(ret); return cpp2::ErrorCode::E_STORE_FAILURE; } @@ -446,24 +465,27 @@ Balancer::buildLeaderBalancePlan(HostLeaderMap* hostLeaderMap, GraphSpaceID spac std::unordered_map> allHostParts; getHostParts(spaceId, allHostParts, totalParts); - size_t avgLoad = leaderParts / hostLeaderMap->size(); + std::unordered_set activeHosts; + for (const auto& host : *hostLeaderMap) { + // only balance leader between hosts which have valid partition + if (!allHostParts[host.first].empty()) { + activeHosts.emplace(host.first); + leaderHostParts[host.first] = std::move((*hostLeaderMap)[host.first][spaceId]); + } + } + + size_t avgLoad = leaderParts / activeHosts.size(); size_t minLoad = avgLoad; size_t maxLoad = avgLoad + 1; if (useDeviation) { - minLoad = std::ceil(static_cast (leaderParts) / hostLeaderMap->size() * + minLoad = std::ceil(static_cast (leaderParts) / activeHosts.size() * (1 - FLAGS_leader_balance_deviation)); - maxLoad = std::floor(static_cast (leaderParts) / hostLeaderMap->size() * + maxLoad = std::floor(static_cast (leaderParts) / activeHosts.size() * (1 + FLAGS_leader_balance_deviation)); } LOG(INFO) << "Build leader balance plan, expeceted min load: " << minLoad << ", max load: " << maxLoad; - std::unordered_set activeHosts; - for (const auto& host : *hostLeaderMap) { - activeHosts.emplace(host.first); - leaderHostParts[host.first] = std::move((*hostLeaderMap)[host.first][spaceId]); - } - while (true) { bool hasUnbalancedHost = false; int32_t taskCount = 0; diff --git a/src/meta/processors/admin/Balancer.h b/src/meta/processors/admin/Balancer.h index 2289d2f5448..01843e3c752 100644 --- a/src/meta/processors/admin/Balancer.h +++ b/src/meta/processors/admin/Balancer.h @@ -44,6 +44,7 @@ class Balancer { FRIEND_TEST(BalanceTest, BalancePartsTest); FRIEND_TEST(BalanceTest, NormalTest); FRIEND_TEST(BalanceTest, RecoveryTest); + FRIEND_TEST(BalanceTest, StopBalanceDataTest); FRIEND_TEST(BalanceTest, LeaderBalancePlanTest); FRIEND_TEST(BalanceTest, SimpleLeaderBalancePlanTest); FRIEND_TEST(BalanceTest, IntersectHostsLeaderBalancePlanTest); @@ -71,6 +72,11 @@ class Balancer { * */ StatusOr show(BalanceID id) const; + /** + * Stop balance plan by canceling all waiting balance task. + * */ + StatusOr stop(); + /** * TODO(heng): rollback some balance plan. */ @@ -171,14 +177,15 @@ class Balancer { GraphSpaceID spaceId); private: - std::atomic_bool running_{false}; + bool running_{false}; kvstore::KVStore* kv_ = nullptr; std::unique_ptr client_{nullptr}; // Current running plan. std::unique_ptr plan_{nullptr}; std::unique_ptr executor_; - std::atomic_bool inLeaderBalance_{false}; + std::atomic_bool inLeaderBalance_{false}; std::unique_ptr hostLeaderMap_; + std::mutex lock_; }; } // namespace meta diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index afe2d304e1a..080848a7824 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -19,6 +19,17 @@ DECLARE_int32(wait_time_after_open_part_ms); namespace nebula { namespace meta { +class TestFaultInjectorWithSleep : public TestFaultInjector { +public: + explicit TestFaultInjectorWithSleep(std::vector sts) + : TestFaultInjector(std::move(sts)) {} + + folly::Future waitingForCatchUpData() override { + sleep(3); + return response(3); + } +}; + TEST(BalanceTaskTest, SimpleTest) { { std::vector sts(7, Status::OK()); @@ -565,6 +576,188 @@ TEST(BalanceTest, RecoveryTest) { } } +TEST(BalanceTest, StopBalanceDataTest) { + FLAGS_task_concurrency = 1; + fs::TempDir rootPath("/tmp/BalanceTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + FLAGS_expired_threshold_sec = 1; + TestUtils::createSomeHosts(kv.get()); + { + cpp2::SpaceProperties properties; + properties.set_space_name("default_space"); + properties.set_partition_num(8); + properties.set_replica_factor(3); + cpp2::CreateSpaceReq req; + req.set_properties(std::move(properties)); + auto* processor = CreateSpaceProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + ASSERT_EQ(1, resp.get_id().get_space_id()); + } + + sleep(1); + TestUtils::registerHB(kv.get(), {{0, 0}, {1, 1}, {2, 2}}); + std::vector sts(7, Status::OK()); + std::unique_ptr injector(new TestFaultInjectorWithSleep(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + Balancer balancer(kv.get(), std::move(client)); + auto ret = balancer.balance(); + CHECK(ret.ok()); + auto balanceId = ret.value(); + + sleep(1); + LOG(INFO) << "Rebalance should still in progress"; + { + const auto& prefix = BalancePlan::prefix(); + std::unique_ptr iter; + auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); + int num = 0; + while (iter->valid()) { + auto id = BalancePlan::id(iter->key()); + auto status = BalancePlan::status(iter->val()); + ASSERT_EQ(balanceId, id); + ASSERT_EQ(BalancePlan::Status::IN_PROGRESS, status); + num++; + iter->next(); + } + ASSERT_EQ(1, num); + } + std::vector taskWaitingPartIds; + PartitionID taskStartedPartId; + { + const auto& prefix = BalanceTask::prefix(balanceId); + std::unique_ptr iter; + auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); + int32_t num = 0; + while (iter->valid()) { + BalanceTask task; + PartitionID partId = 0; + { + auto tup = BalanceTask::parseKey(iter->key()); + task.balanceId_ = std::get<0>(tup); + ASSERT_EQ(balanceId, task.balanceId_); + task.spaceId_ = std::get<1>(tup); + ASSERT_EQ(1, task.spaceId_); + task.src_ = std::get<3>(tup); + ASSERT_EQ(HostAddr(3, 3), task.src_); + partId = std::get<2>(tup); + } + { + auto tup = BalanceTask::parseVal(iter->val()); + task.status_ = std::get<0>(tup); + task.ret_ = std::get<1>(tup); + ASSERT_EQ(BalanceTask::Result::IN_PROGRESS, task.ret_); + task.srcLived_ = std::get<2>(tup); + ASSERT_FALSE(task.srcLived_); + task.startTimeMs_ = std::get<3>(tup); + // the task status would be CATCH_UP_DATA at most + if (task.status_ == BalanceTask::Status::START) { + ASSERT_EQ(task.startTimeMs_, 0); + taskWaitingPartIds.emplace_back(partId); + } else { + ASSERT_GT(task.startTimeMs_, 0); + taskStartedPartId = partId; + } + // the task has not completed yet + task.endTimeMs_ = std::get<4>(tup); + ASSERT_EQ(task.endTimeMs_, 0); + } + num++; + iter->next(); + } + ASSERT_EQ(5, taskWaitingPartIds.size()); + ASSERT_EQ(6, num); + } + + TestUtils::registerHB(kv.get(), {{0, 0}, {1, 1}, {2, 2}}); + ret = balancer.stop(); + CHECK(ret.ok()); + ASSERT_EQ(ret.value(), balanceId); + + // wait until the only IN_PROGRESS task finished; + sleep(3); + { + const auto& prefix = BalanceTask::prefix(balanceId); + std::unique_ptr iter; + auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); + int32_t num = 0; + while (iter->valid()) { + BalanceTask task; + PartitionID partId = std::get<2>(BalanceTask::parseKey(iter->key())); + { + auto tup = BalanceTask::parseVal(iter->val()); + task.status_ = std::get<0>(tup); + task.ret_ = std::get<1>(tup); + task.startTimeMs_ = std::get<3>(tup); + task.endTimeMs_ = std::get<4>(tup); + if (partId != taskStartedPartId) { + // the task marked as invalid + ASSERT_EQ(task.status_, BalanceTask::Status::START); + ASSERT_EQ(task.ret_, BalanceTask::Result::INVALID); + // startTime = 0, endTime > 0 + ASSERT_EQ(task.startTimeMs_, 0); + ASSERT_GT(task.endTimeMs_, 0); + } else { + // the task should have been finished + ASSERT_EQ(task.status_, BalanceTask::Status::END); + ASSERT_EQ(task.ret_, BalanceTask::Result::SUCCEEDED); + // startTime > 0, endTime > 0 + ASSERT_GT(task.startTimeMs_, 0); + ASSERT_GT(task.endTimeMs_, 0); + } + } + num++; + iter->next(); + } + ASSERT_EQ(6, num); + } + + TestUtils::registerHB(kv.get(), {{0, 0}, {1, 1}, {2, 2}}); + ret = balancer.balance(); + CHECK(ret.ok()); + // resume stopped plan + sleep(1); + { + const auto& prefix = BalanceTask::prefix(balanceId); + std::unique_ptr iter; + auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); + int32_t num = 0; + int32_t taskWaiting = 0; + int32_t taskStarted = 0; + int32_t taskEnded = 0; + while (iter->valid()) { + BalanceTask task; + { + auto tup = BalanceTask::parseVal(iter->val()); + task.status_ = std::get<0>(tup); + task.ret_ = std::get<1>(tup); + task.startTimeMs_ = std::get<3>(tup); + task.endTimeMs_ = std::get<4>(tup); + if (task.status_ == BalanceTask::Status::END) { + ++taskEnded; + } else if (task.status_ == BalanceTask::Status::START) { + ++taskWaiting; + } else { + ++taskStarted; + } + } + num++; + iter->next(); + } + ASSERT_EQ(6, num); + EXPECT_EQ(1, taskStarted); + EXPECT_EQ(1, taskEnded); + EXPECT_EQ(4, taskWaiting); + } +} + + void verifyLeaderBalancePlan(std::unordered_map> leaderCount, size_t minLoad, size_t maxLoad) { for (const auto& hostEntry : leaderCount) { diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index 87a274ef46a..9e90a11f577 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -373,6 +373,7 @@ class BalanceSentence final : public Sentence { kUnknown, kLeader, kData, + kDataStop, kShowBalancePlan, }; diff --git a/src/parser/parser.yy b/src/parser/parser.yy index b636a7f320a..87eb6c4aaa8 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -109,7 +109,7 @@ class GraphScanner; %token KW_ORDER KW_ASC KW_LIMIT KW_OFFSET %token KW_FETCH KW_PROP KW_UPDATE KW_UPSERT KW_WHEN %token KW_DISTINCT KW_ALL KW_OF -%token KW_BALANCE KW_LEADER KW_DATA +%token KW_BALANCE KW_LEADER KW_DATA KW_STOP %token KW_SHORTEST KW_PATH /* symbols */ @@ -1663,6 +1663,9 @@ balance_sentence | KW_BALANCE KW_DATA INTEGER { $$ = new BalanceSentence($3); } + | KW_BALANCE KW_DATA KW_STOP { + $$ = new BalanceSentence(BalanceSentence::SubType::kDataStop); + } ; mutate_sentence diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index e51b6a3aea7..30cfae1b70d 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -123,6 +123,7 @@ LEADER ([Ll][Ee][Aa][Dd][Ee][Rr]) UUID ([Uu][Uu][Ii][Dd]) OF ([Oo][Ff]) DATA ([Dd][Aa][Tt][Aa]) +STOP ([Ss][Tt][Oo][Pp]) SHORTEST ([Ss][Hh][Oo][Rr][Tt][Ee][Ss][Tt]) PATH ([Pp][Aa][Tt][Hh]) LIMIT ([Ll][Ii][Mm][Ii][Tt]) @@ -239,6 +240,7 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]) {LEADER} { return TokenType::KW_LEADER; } {UUID} { return TokenType::KW_UUID; } {DATA} { return TokenType::KW_DATA; } +{STOP} { return TokenType::KW_STOP; } {SHORTEST} { return TokenType::KW_SHORTEST; } {PATH} { return TokenType::KW_PATH; } {LIMIT} { return TokenType::KW_LIMIT; } diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index 9b472514cc8..cfcfac33f86 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -1382,6 +1382,24 @@ TEST(Parser, BalanceOperation) { auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } + { + GQLParser parser; + std::string query = "BALANCE DATA"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "BALANCE DATA 1234567890"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "BALANCE DATA STOP"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } } TEST(Parser, CrashByFuzzer) { From 2b9259e55fa312cf6e3b47803f6095bf1138e489 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Fri, 8 Nov 2019 17:31:05 +0800 Subject: [PATCH 2/3] address dangleptr's comments --- src/meta/processors/admin/BalancePlan.cpp | 32 +++----- src/meta/processors/admin/BalancePlan.h | 6 +- .../processors/admin/BalanceProcessor.cpp | 1 + src/meta/processors/admin/BalanceTask.cpp | 2 + src/meta/processors/admin/BalanceTask.h | 7 -- src/meta/processors/admin/Balancer.cpp | 32 ++++---- src/meta/processors/admin/Balancer.h | 11 ++- src/meta/test/BalancerTest.cpp | 80 +++---------------- 8 files changed, 56 insertions(+), 115 deletions(-) diff --git a/src/meta/processors/admin/BalancePlan.cpp b/src/meta/processors/admin/BalancePlan.cpp index e0a9e8ad632..3eef9840fd5 100644 --- a/src/meta/processors/admin/BalancePlan.cpp +++ b/src/meta/processors/admin/BalancePlan.cpp @@ -48,6 +48,7 @@ void BalancePlan::invoke() { auto taskIndex = buckets_[i][j]; tasks_[taskIndex].onFinished_ = [this, i, j]() { bool finished = false; + bool stopped = false; { std::lock_guard lg(lock_); finishedTaskNum_++; @@ -58,20 +59,23 @@ void BalancePlan::invoke() { LOG(INFO) << "Balance " << id_ << " succeeded!"; } } + stopped = stopped_; } if (finished) { CHECK_EQ(j, this->buckets_[i].size() - 1); saveInStore(true); onFinished_(); - } else { - if (j + 1 < this->buckets_[i].size()) { - auto& task = this->tasks_[this->buckets_[i][j + 1]]; - task.invoke(); + } else if (j + 1 < this->buckets_[i].size()) { + auto& task = this->tasks_[this->buckets_[i][j + 1]]; + if (stopped) { + task.ret_ = BalanceTask::Result::INVALID; } + task.invoke(); } }; // onFinished tasks_[taskIndex].onError_ = [this, i, j]() { bool finished = false; + bool stopped = false; { std::lock_guard lg(lock_); finishedTaskNum_++; @@ -85,11 +89,12 @@ void BalancePlan::invoke() { CHECK_EQ(j, this->buckets_[i].size() - 1); saveInStore(true); onFinished_(); - } else { - if (j + 1 < this->buckets_[i].size()) { - auto& task = this->tasks_[this->buckets_[i][j + 1]]; - task.invoke(); + } else if (j + 1 < this->buckets_[i].size()) { + auto& task = this->tasks_[this->buckets_[i][j + 1]]; + if (stopped) { + task.ret_ = BalanceTask::Result::INVALID; } + task.invoke(); } }; // onError } // for (auto j = 0; j < buckets_[i].size(); j++) @@ -181,17 +186,6 @@ bool BalancePlan::recovery(bool resume) { return true; } -void BalancePlan::stop() { - { - std::lock_guard lg(lock_); - for (auto& task : tasks_) { - task.markInvalidIfNotStarted(); - } - status_ = BalancePlan::Status::FAILED; - saveInStore(); - } -} - std::string BalancePlan::planKey() const { std::string str; str.reserve(48); diff --git a/src/meta/processors/admin/BalancePlan.h b/src/meta/processors/admin/BalancePlan.h index 0a87be42574..5a6ce7f501b 100644 --- a/src/meta/processors/admin/BalancePlan.h +++ b/src/meta/processors/admin/BalancePlan.h @@ -76,7 +76,10 @@ class BalancePlan { return tasks_; } - void stop(); + void stop() { + std::lock_guard lg(lock_); + stopped_ = true; + } private: bool recovery(bool resume = true); @@ -102,6 +105,7 @@ class BalancePlan { size_t finishedTaskNum_ = 0; std::function onFinished_; Status status_ = Status::NOT_START; + bool stopped_ = false; // List of task index in tasks_; using Bucket = std::vector; diff --git a/src/meta/processors/admin/BalanceProcessor.cpp b/src/meta/processors/admin/BalanceProcessor.cpp index 36fde6236e1..f36ec25f334 100644 --- a/src/meta/processors/admin/BalanceProcessor.cpp +++ b/src/meta/processors/admin/BalanceProcessor.cpp @@ -32,6 +32,7 @@ void BalanceProcessor::process(const cpp2::BalanceReq& req) { } resp_.set_code(cpp2::ErrorCode::SUCCEEDED); resp_.set_id(ret.value()); + onFinished(); return; } if (req.get_id() != nullptr) { diff --git a/src/meta/processors/admin/BalanceTask.cpp b/src/meta/processors/admin/BalanceTask.cpp index b5e291ade51..9e07f5c71d5 100644 --- a/src/meta/processors/admin/BalanceTask.cpp +++ b/src/meta/processors/admin/BalanceTask.cpp @@ -10,6 +10,7 @@ DEFINE_int32(wait_time_after_open_part_ms, 3000, "The wait time after open part, zero means no wait"); +DECLARE_uint32(raft_heartbeat_interval_secs); namespace nebula { namespace meta { @@ -75,6 +76,7 @@ void BalanceTask::invoke() { if (FLAGS_wait_time_after_open_part_ms > 0) { usleep(FLAGS_wait_time_after_open_part_ms * 1000); } + sleep(FLAGS_raft_heartbeat_interval_secs); } invoke(); }); diff --git a/src/meta/processors/admin/BalanceTask.h b/src/meta/processors/admin/BalanceTask.h index 7c5da76f807..989534071f0 100644 --- a/src/meta/processors/admin/BalanceTask.h +++ b/src/meta/processors/admin/BalanceTask.h @@ -74,13 +74,6 @@ class BalanceTask { void rollback(); - void markInvalidIfNotStarted() { - if (status_ == Status::START) { - LOG(INFO) << "mark invalid of " << taskIdStr_; - ret_ = Result::INVALID; - } - } - Result result() const { return ret_; } diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp index f6427dc9e5d..cdbc57bfa9f 100644 --- a/src/meta/processors/admin/Balancer.cpp +++ b/src/meta/processors/admin/Balancer.cpp @@ -23,11 +23,15 @@ StatusOr Balancer::balance() { if (!running_) { if (!recovery()) { LOG(ERROR) << "Recovery balancer failed!"; + finish(); return Status::Error("Can't do balance because there is one corruptted balance plan!"); } if (plan_ == nullptr) { LOG(INFO) << "There is no corrupted plan need to recovery, so create a new one"; auto status = buildBalancePlan(); + if (!status.ok()) { + finish(); + } if (plan_ == nullptr) { LOG(ERROR) << "Create balance plan failed, status " << status.toString(); return status; @@ -89,18 +93,14 @@ bool Balancer::recovery() { CHECK_EQ(1, corruptedPlans.size()); plan_ = std::make_unique(corruptedPlans[0], kv_, client_.get()); plan_->onFinished_ = [this] () { - // Get the reference before the captured `this' lost - auto &lock = this->lock_; + auto self = plan_; { - std::lock_guard lg(lock); - running_ = false; - plan_.reset(); + std::lock_guard lg(lock_); + finish(); } }; if (!plan_->recovery()) { LOG(ERROR) << "Can't recovery plan " << corruptedPlans[0]; - running_ = false; - plan_.reset(); return false; } } @@ -130,7 +130,6 @@ Status Balancer::buildBalancePlan() { std::vector spaces; kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED; if (!getAllSpaces(spaces, ret)) { - running_ = false; return Status::Error("Can't access kvstore, ret = %d", static_cast(ret)); } plan_ = std::make_unique(time::WallClock::fastNowInSec(), kv_, client_.get()); @@ -141,22 +140,16 @@ Status Balancer::buildBalancePlan() { } } plan_->onFinished_ = [this] () { - // Get the reference before the captured `this' lost - auto &lock = this->lock_; + auto self = plan_; { - std::lock_guard lg(lock); - running_ = false; - plan_.reset(); + std::lock_guard lg(lock_); + finish(); } }; if (plan_->tasks_.empty()) { - running_ = false; - plan_.reset(); return Status::Balanced(); } if (!plan_->saveInStore()) { - running_ = false; - plan_.reset(); return Status::Error("Can't persist the plan"); } return Status::OK(); @@ -474,6 +467,11 @@ Balancer::buildLeaderBalancePlan(HostLeaderMap* hostLeaderMap, GraphSpaceID spac } } + if (activeHosts.empty()) { + LOG(ERROR) << "No active hosts"; + return leaderHostParts; + } + size_t avgLoad = leaderParts / activeHosts.size(); size_t minLoad = avgLoad; size_t maxLoad = avgLoad + 1; diff --git a/src/meta/processors/admin/Balancer.h b/src/meta/processors/admin/Balancer.h index 01843e3c752..8622b5a1746 100644 --- a/src/meta/processors/admin/Balancer.h +++ b/src/meta/processors/admin/Balancer.h @@ -102,7 +102,14 @@ class Balancer { cpp2::ErrorCode leaderBalance(); + void finish() { + CHECK(!lock_.try_lock()); + plan_.reset(); + running_ = false; + } + bool isRunning() { + std::lock_guard lg(lock_); return running_; } @@ -177,11 +184,11 @@ class Balancer { GraphSpaceID spaceId); private: - bool running_{false}; + std::atomic_bool running_{false}; kvstore::KVStore* kv_ = nullptr; std::unique_ptr client_{nullptr}; // Current running plan. - std::unique_ptr plan_{nullptr}; + std::shared_ptr plan_{nullptr}; std::unique_ptr executor_; std::atomic_bool inLeaderBalance_{false}; std::unique_ptr hostLeaderMap_; diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 080848a7824..47acc92f177 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -625,53 +625,6 @@ TEST(BalanceTest, StopBalanceDataTest) { } ASSERT_EQ(1, num); } - std::vector taskWaitingPartIds; - PartitionID taskStartedPartId; - { - const auto& prefix = BalanceTask::prefix(balanceId); - std::unique_ptr iter; - auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); - ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); - int32_t num = 0; - while (iter->valid()) { - BalanceTask task; - PartitionID partId = 0; - { - auto tup = BalanceTask::parseKey(iter->key()); - task.balanceId_ = std::get<0>(tup); - ASSERT_EQ(balanceId, task.balanceId_); - task.spaceId_ = std::get<1>(tup); - ASSERT_EQ(1, task.spaceId_); - task.src_ = std::get<3>(tup); - ASSERT_EQ(HostAddr(3, 3), task.src_); - partId = std::get<2>(tup); - } - { - auto tup = BalanceTask::parseVal(iter->val()); - task.status_ = std::get<0>(tup); - task.ret_ = std::get<1>(tup); - ASSERT_EQ(BalanceTask::Result::IN_PROGRESS, task.ret_); - task.srcLived_ = std::get<2>(tup); - ASSERT_FALSE(task.srcLived_); - task.startTimeMs_ = std::get<3>(tup); - // the task status would be CATCH_UP_DATA at most - if (task.status_ == BalanceTask::Status::START) { - ASSERT_EQ(task.startTimeMs_, 0); - taskWaitingPartIds.emplace_back(partId); - } else { - ASSERT_GT(task.startTimeMs_, 0); - taskStartedPartId = partId; - } - // the task has not completed yet - task.endTimeMs_ = std::get<4>(tup); - ASSERT_EQ(task.endTimeMs_, 0); - } - num++; - iter->next(); - } - ASSERT_EQ(5, taskWaitingPartIds.size()); - ASSERT_EQ(6, num); - } TestUtils::registerHB(kv.get(), {{0, 0}, {1, 1}, {2, 2}}); ret = balancer.stop(); @@ -685,41 +638,34 @@ TEST(BalanceTest, StopBalanceDataTest) { std::unique_ptr iter; auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); - int32_t num = 0; + int32_t taskEnded = 0; + int32_t taskStopped = 0; while (iter->valid()) { BalanceTask task; - PartitionID partId = std::get<2>(BalanceTask::parseKey(iter->key())); + // PartitionID partId = std::get<2>(BalanceTask::parseKey(iter->key())); { auto tup = BalanceTask::parseVal(iter->val()); task.status_ = std::get<0>(tup); task.ret_ = std::get<1>(tup); task.startTimeMs_ = std::get<3>(tup); task.endTimeMs_ = std::get<4>(tup); - if (partId != taskStartedPartId) { - // the task marked as invalid - ASSERT_EQ(task.status_, BalanceTask::Status::START); - ASSERT_EQ(task.ret_, BalanceTask::Result::INVALID); - // startTime = 0, endTime > 0 - ASSERT_EQ(task.startTimeMs_, 0); - ASSERT_GT(task.endTimeMs_, 0); + + if (task.status_ == BalanceTask::Status::END) { + taskEnded++; } else { - // the task should have been finished - ASSERT_EQ(task.status_, BalanceTask::Status::END); - ASSERT_EQ(task.ret_, BalanceTask::Result::SUCCEEDED); - // startTime > 0, endTime > 0 - ASSERT_GT(task.startTimeMs_, 0); - ASSERT_GT(task.endTimeMs_, 0); + taskStopped++; } } - num++; iter->next(); } - ASSERT_EQ(6, num); + ASSERT_EQ(1, taskEnded); + ASSERT_EQ(5, taskStopped); } TestUtils::registerHB(kv.get(), {{0, 0}, {1, 1}, {2, 2}}); ret = balancer.balance(); CHECK(ret.ok()); + ASSERT_NE(ret.value(), balanceId); // resume stopped plan sleep(1); { @@ -728,7 +674,6 @@ TEST(BalanceTest, StopBalanceDataTest) { auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); int32_t num = 0; - int32_t taskWaiting = 0; int32_t taskStarted = 0; int32_t taskEnded = 0; while (iter->valid()) { @@ -742,8 +687,6 @@ TEST(BalanceTest, StopBalanceDataTest) { if (task.status_ == BalanceTask::Status::END) { ++taskEnded; } else if (task.status_ == BalanceTask::Status::START) { - ++taskWaiting; - } else { ++taskStarted; } } @@ -751,9 +694,8 @@ TEST(BalanceTest, StopBalanceDataTest) { iter->next(); } ASSERT_EQ(6, num); - EXPECT_EQ(1, taskStarted); + EXPECT_EQ(5, taskStarted); EXPECT_EQ(1, taskEnded); - EXPECT_EQ(4, taskWaiting); } } From 0ba61df857aacfb24cc68b32473bed09200f6f74 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Mon, 11 Nov 2019 14:16:43 +0800 Subject: [PATCH 3/3] update log, fix ut --- src/meta/processors/admin/BalanceTask.cpp | 2 +- src/meta/processors/admin/Balancer.cpp | 4 +--- src/storage/AdminProcessor.h | 5 +++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/meta/processors/admin/BalanceTask.cpp b/src/meta/processors/admin/BalanceTask.cpp index 9e07f5c71d5..8b2d2572fa2 100644 --- a/src/meta/processors/admin/BalanceTask.cpp +++ b/src/meta/processors/admin/BalanceTask.cpp @@ -75,8 +75,8 @@ void BalanceTask::invoke() { status_ = Status::ADD_LEARNER; if (FLAGS_wait_time_after_open_part_ms > 0) { usleep(FLAGS_wait_time_after_open_part_ms * 1000); + sleep(FLAGS_raft_heartbeat_interval_secs); } - sleep(FLAGS_raft_heartbeat_interval_secs); } invoke(); }); diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp index cdbc57bfa9f..dc35518618f 100644 --- a/src/meta/processors/admin/Balancer.cpp +++ b/src/meta/processors/admin/Balancer.cpp @@ -30,10 +30,8 @@ StatusOr Balancer::balance() { LOG(INFO) << "There is no corrupted plan need to recovery, so create a new one"; auto status = buildBalancePlan(); if (!status.ok()) { - finish(); - } - if (plan_ == nullptr) { LOG(ERROR) << "Create balance plan failed, status " << status.toString(); + finish(); return status; } } diff --git a/src/storage/AdminProcessor.h b/src/storage/AdminProcessor.h index 0a24d98d51c..d448f1c60c2 100644 --- a/src/storage/AdminProcessor.h +++ b/src/storage/AdminProcessor.h @@ -231,9 +231,10 @@ class WaitingForCatchUpDataProcessor : public BaseProcessor folly::async([this, part, peer, spaceId, partId] { int retry = FLAGS_waiting_catch_up_retry_times; while (retry-- > 0) { - LOG(INFO) << "Waiting for catching up data, peer " << peer - << ", try " << retry << " times"; auto res = part->isCatchedUp(peer); + LOG(INFO) << "Waiting for catching up data, peer " << peer + << ", remaining " << retry << " retry times" + << ", result " << static_cast(res); switch (res) { case raftex::AppendLogResult::SUCCEEDED: onFinished();