Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Balance stop #1238

Merged
merged 5 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/graph/BalanceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ void BalanceExecutor::execute() {
case BalanceSentence::SubType::kData:
balanceData();
break;
case BalanceSentence::SubType::kDataStop:
balanceData(true);
break;
case BalanceSentence::SubType::kShowBalancePlan:
showBalancePlan();
break;
Expand Down Expand Up @@ -66,8 +69,8 @@ void BalanceExecutor::balanceLeader() {
std::move(future).via(runner).thenValue(cb).thenError(error);
}

void BalanceExecutor::balanceData() {
auto future = ectx()->getMetaClient()->balance();
void BalanceExecutor::balanceData(bool isStop) {
auto future = ectx()->getMetaClient()->balance(isStop);
auto *runner = ectx()->rctx()->runner();

auto cb = [this] (auto &&resp) {
Expand Down
4 changes: 3 additions & 1 deletion src/graph/BalanceExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class BalanceExecutor final : public Executor {

void balanceLeader();

void balanceData();
void balanceData(bool isStop = false);

void stopBalanceData();

void showBalancePlan();

Expand Down
2 changes: 2 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum ErrorCode {
E_STORE_SEGMENT_ILLEGAL = -32,
E_BAD_BALANCE_PLAN = -33,
E_BALANCED = -34,
E_NO_RUNNING_BALANCE_PLAN = -35,

E_INVALID_PASSWORD = -41,
E_INPROPER_ROLE = -42,
Expand Down Expand Up @@ -433,6 +434,7 @@ struct BalanceReq {
1: optional common.GraphSpaceID space_id,
// Specify the balance id to check the status of the related balance plan
2: optional i64 id,
3: optional bool stop,
}

enum TaskResult {
Expand Down
20 changes: 16 additions & 4 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,31 +152,39 @@ void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) {
void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) {
std::string log = encodeHost(OP_ADD_LEARNER, learner);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb)] (AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), learner, this] (AppendLogResult res) mutable {
LOG(INFO) << idStr_ << "add learner " << learner
<< ", result: " << static_cast<int32_t>(toResultCode(res));
callback(toResultCode(res));
});
}

void Part::asyncTransferLeader(const HostAddr& target, KVCallback cb) {
std::string log = encodeHost(OP_TRANS_LEADER, target);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb)] (AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), target, this] (AppendLogResult res) mutable {
LOG(INFO) << idStr_ << "transfer leader to " << target
<< ", result: " << static_cast<int32_t>(toResultCode(res));
callback(toResultCode(res));
});
}

void Part::asyncAddPeer(const HostAddr& peer, KVCallback cb) {
std::string log = encodeHost(OP_ADD_PEER, peer);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb)] (AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), peer, this] (AppendLogResult res) mutable {
LOG(INFO) << idStr_ << "add peer " << peer
<< ", result: " << static_cast<int32_t>(toResultCode(res));
callback(toResultCode(res));
});
}

void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) {
std::string log = encodeHost(OP_REMOVE_PEER, peer);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb)] (AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), peer, this] (AppendLogResult res) mutable {
LOG(INFO) << idStr_ << "remove peer " << peer
<< ", result: " << static_cast<int32_t>(toResultCode(res));
callback(toResultCode(res));
});
}
Expand Down Expand Up @@ -360,6 +368,7 @@ bool Part::preProcessLog(LogID logId,
auto learner = decodeHost(OP_ADD_LEARNER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
LOG(INFO) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
} else {
LOG(INFO) << idStr_ << "Skip stale add learner " << learner;
Expand All @@ -370,6 +379,7 @@ bool Part::preProcessLog(LogID logId,
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
} else {
LOG(INFO) << idStr_ << "Skip stale transfer leader " << newLeader;
Expand All @@ -380,6 +390,7 @@ bool Part::preProcessLog(LogID logId,
auto peer = decodeHost(OP_ADD_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
LOG(INFO) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip stale add peer " << peer;
Expand All @@ -390,6 +401,7 @@ bool Part::preProcessLog(LogID logId,
auto peer = decodeHost(OP_REMOVE_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
LOG(INFO) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
} else {
LOG(INFO) << idStr_ << "Skip stale remove peer " << peer;
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,8 @@ void RaftPart::processAppendLogRequest(
lastMsgRecvDur_.reset();

if (req.get_sending_snapshot() && status_ != Status::WAITING_SNAPSHOT) {
LOG(INFO) << idStr_ << "Begin to wait for the snapshot";
LOG(INFO) << idStr_ << "Begin to wait for the snapshot"
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
<< " " << req.get_committed_log_id();
reset();
status_ = Status::WAITING_SNAPSHOT;
resp.set_error_code(cpp2::ErrorCode::E_WAITING_SNAPSHOT);
Expand Down Expand Up @@ -1709,6 +1710,7 @@ void RaftPart::reset() {

AppendLogResult RaftPart::isCatchedUp(const HostAddr& peer) {
std::lock_guard<std::mutex> lck(logsLock_);
LOG(INFO) << idStr_ << "Check whether I catch up";
if (role_ != Role::LEADER) {
LOG(INFO) << idStr_ << "I am not the leader";
return AppendLogResult::E_NOT_A_LEADER;
Expand Down
7 changes: 6 additions & 1 deletion src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("The balancer is running!");
case cpp2::ErrorCode::E_BAD_BALANCE_PLAN:
return Status::Error("Bad balance plan!");
case cpp2::ErrorCode::E_NO_RUNNING_BALANCE_PLAN:
return Status::Error("No running balance plan!");
}
default:
return Status::Error("Unknown code %d", static_cast<int32_t>(resp.get_code()));
Expand Down Expand Up @@ -1154,8 +1156,11 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
return future;
}

folly::Future<StatusOr<int64_t>> MetaClient::balance() {
folly::Future<StatusOr<int64_t>> MetaClient::balance(bool isStop) {
cpp2::BalanceReq req;
if (isStop) {
req.set_stop(isStop);
}
folly::Promise<StatusOr<int64_t>> promise;
auto future = promise.getFuture();
getResponse(std::move(req), [] (auto client, auto request) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class MetaClient {

// Operations for admin
folly::Future<StatusOr<int64_t>>
balance();
balance(bool isStop = false);

folly::Future<StatusOr<std::vector<cpp2::BalanceTask>>>
showBalance(int64_t balanceId);
Expand Down
21 changes: 13 additions & 8 deletions src/meta/processors/admin/BalancePlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void BalancePlan::invoke() {
auto taskIndex = buckets_[i][j];
tasks_[taskIndex].onFinished_ = [this, i, j]() {
bool finished = false;
bool stopped = false;
{
std::lock_guard<std::mutex> lg(lock_);
finishedTaskNum_++;
Expand All @@ -58,20 +59,23 @@ void BalancePlan::invoke() {
LOG(INFO) << "Balance " << id_ << " succeeded!";
}
}
stopped = stopped_;
}
if (finished) {
CHECK_EQ(j, this->buckets_[i].size() - 1);
saveInStore(true);
onFinished_();
} else {
if (j + 1 < this->buckets_[i].size()) {
auto& task = this->tasks_[this->buckets_[i][j + 1]];
task.invoke();
} else if (j + 1 < this->buckets_[i].size()) {
auto& task = this->tasks_[this->buckets_[i][j + 1]];
if (stopped) {
task.ret_ = BalanceTask::Result::INVALID;
}
task.invoke();
}
}; // onFinished
tasks_[taskIndex].onError_ = [this, i, j]() {
bool finished = false;
bool stopped = false;
{
std::lock_guard<std::mutex> lg(lock_);
finishedTaskNum_++;
Expand All @@ -85,11 +89,12 @@ void BalancePlan::invoke() {
CHECK_EQ(j, this->buckets_[i].size() - 1);
saveInStore(true);
onFinished_();
} else {
if (j + 1 < this->buckets_[i].size()) {
auto& task = this->tasks_[this->buckets_[i][j + 1]];
task.invoke();
} else if (j + 1 < this->buckets_[i].size()) {
auto& task = this->tasks_[this->buckets_[i][j + 1]];
if (stopped) {
task.ret_ = BalanceTask::Result::INVALID;
}
task.invoke();
}
}; // onError
} // for (auto j = 0; j < buckets_[i].size(); j++)
Expand Down
7 changes: 7 additions & 0 deletions src/meta/processors/admin/BalancePlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class BalancePlan {
FRIEND_TEST(BalanceTest, NormalTest);
FRIEND_TEST(BalanceTest, RecoveryTest);
FRIEND_TEST(BalanceTest, DispatchTasksTest);
FRIEND_TEST(BalanceTest, StopBalanceDataTest);

public:
enum class Status : uint8_t {
Expand Down Expand Up @@ -75,6 +76,11 @@ class BalancePlan {
return tasks_;
}

void stop() {
std::lock_guard<std::mutex> lg(lock_);
stopped_ = true;
}

private:
bool recovery(bool resume = true);

Expand All @@ -99,6 +105,7 @@ class BalancePlan {
size_t finishedTaskNum_ = 0;
std::function<void()> onFinished_;
Status status_ = Status::NOT_START;
bool stopped_ = false;

// List of task index in tasks_;
using Bucket = std::vector<int32_t>;
Expand Down
17 changes: 17 additions & 0 deletions src/meta/processors/admin/BalanceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ void BalanceProcessor::process(const cpp2::BalanceReq& req) {
onFinished();
return;
}
if (req.get_stop() != nullptr) {
if (!(*req.get_stop())) {
resp_.set_code(cpp2::ErrorCode::E_UNKNOWN);
onFinished();
return;
}
auto ret = Balancer::instance(kvstore_)->stop();
if (!ret.ok()) {
resp_.set_code(cpp2::ErrorCode::E_NO_RUNNING_BALANCE_PLAN);
onFinished();
return;
}
resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
resp_.set_id(ret.value());
onFinished();
return;
}
if (req.get_id() != nullptr) {
auto ret = Balancer::instance(kvstore_)->show(*req.get_id());
if (!ret.ok()) {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/processors/admin/BalanceTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

DEFINE_int32(wait_time_after_open_part_ms, 3000,
"The wait time after open part, zero means no wait");
DECLARE_uint32(raft_heartbeat_interval_secs);

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -74,6 +75,7 @@ void BalanceTask::invoke() {
status_ = Status::ADD_LEARNER;
if (FLAGS_wait_time_after_open_part_ms > 0) {
usleep(FLAGS_wait_time_after_open_part_ms * 1000);
sleep(FLAGS_raft_heartbeat_interval_secs);
}
}
invoke();
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/admin/BalanceTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class BalanceTask {
FRIEND_TEST(BalanceTest, BalancePlanTest);
FRIEND_TEST(BalanceTest, NormalTest);
FRIEND_TEST(BalanceTest, RecoveryTest);
FRIEND_TEST(BalanceTest, StopBalanceDataTest);

public:
enum class Status : uint8_t {
Expand Down
Loading