Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support auto adjusting quorum size for 2-node cluster #119

Merged
merged 1 commit into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public:
*/
static const int32 LEAVE_LIMIT = 5;

/**
* For 2-node cluster, if the other peer is not responding for
* pre-vote more than this limit, adjust quorum size.
* Enabled only when `auto_adjust_quorum_for_small_cluster_` is on.
*/
static const int32 VOTE_LIMIT = 10;

peer( ptr<srv_config>& config,
const context& ctx,
timer_task<int32>::executor& hb_exec,
Expand Down
8 changes: 8 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ struct raft_params {
, auto_forwarding_(false)
, use_bg_thread_for_urgent_commit_(true)
, exclude_snp_receiver_from_quorum_(false)
, auto_adjust_quorum_for_small_cluster_(false)
, locking_method_type_(dual_mutex)
, return_method_(blocking)
{}
Expand Down Expand Up @@ -455,6 +456,13 @@ public:
*/
bool exclude_snp_receiver_from_quorum_;

/**
* If `true` and the size of the cluster is 2, the quorum size
* will be adjusted to 1 automatically, once one of two nodes
* becomes offline.
*/
bool auto_adjust_quorum_for_small_cluster_;

/**
* Choose the type of lock that will be used by user threads.
*/
Expand Down
10 changes: 10 additions & 0 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ protected:
struct pre_vote_status_t {
pre_vote_status_t()
: quorum_reject_count_(0)
, failure_count_(0)
{ reset(0); }
void reset(ulong _term) {
term_ = _term;
Expand All @@ -493,7 +494,16 @@ protected:
std::atomic<int32> live_;
std::atomic<int32> dead_;
std::atomic<int32> abandoned_;

/**
* Number of pre-vote rejections by quorum.
*/
std::atomic<int32> quorum_reject_count_;

/**
* Number of pre-vote failures due to not-responding peers.
*/
std::atomic<int32> failure_count_;
};

protected:
Expand Down
30 changes: 30 additions & 0 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ void raft_server::request_append_entries() {
}

bool raft_server::request_append_entries(ptr<peer> p) {
static timer_helper chk_timer(1000*1000);

// Checking the validity of role first.
if (role_ != srv_role::leader) {
// WARNING: We should allow `write_paused_` state for
Expand All @@ -93,6 +95,34 @@ bool raft_server::request_append_entries(ptr<peer> p) {

ptr<raft_params> params = ctx_->get_params();

if ( params->auto_adjust_quorum_for_small_cluster_ &&
get_num_voting_members() == 2 &&
chk_timer.timeout_and_reset() ) {
// If auto adjust mode is on for 2-node cluster, and
// the follower is not responding, adjust the quorum.
size_t num_not_responding_peers = get_not_responding_peers();
size_t cur_quorum_size = get_quorum_for_commit();
if ( num_not_responding_peers > 0 &&
cur_quorum_size >= 1 ) {
p_wn("2-node cluster's follower is not responding long time, "
"adjust quorum to 1");
ptr<raft_params> clone = cs_new<raft_params>(*params);
clone->custom_commit_quorum_size_ = 1;
clone->custom_election_quorum_size_ = 1;
ctx_->set_params(clone);

} else if ( num_not_responding_peers == 0 &&
cur_quorum_size == 0 ) {
// Recovered.
p_wn("2-node cluster's follower is responding now, "
"restore quorum with default value");
ptr<raft_params> clone = cs_new<raft_params>(*params);
clone->custom_commit_quorum_size_ = 0;
clone->custom_election_quorum_size_ = 0;
ctx_->set_params(clone);
}
}

bool need_to_reconnect = p->need_to_reconnect();
int32 last_active_time_ms = p->get_active_timer_us() / 1000;
if ( last_active_time_ms >
Expand Down
28 changes: 28 additions & 0 deletions src/handle_vote.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,34 @@ void raft_server::request_prevote() {
}
}

int quorum_size = get_quorum_for_election();
if ( pre_vote_.live_ + pre_vote_.dead_ > 0 &&
pre_vote_.live_ + pre_vote_.dead_ < quorum_size + 1) {
// Pre-vote failed due to non-responding voters.
pre_vote_.failure_count_++;
p_wn("total %zu nodes (including this node) responded for pre-vote "
"(term %zu, live %zu, dead %zu), at least %zu nodes should "
"respond. failure count %zu",
pre_vote_.live_.load() + pre_vote_.dead_.load(),
pre_vote_.term_,
pre_vote_.live_.load(),
pre_vote_.dead_.load(),
quorum_size + 1,
pre_vote_.failure_count_.load());
}
int num_voting_members = get_num_voting_members();
if ( params->auto_adjust_quorum_for_small_cluster_ &&
num_voting_members == 2 &&
pre_vote_.failure_count_ > peer::VOTE_LIMIT ) {
// 2-node cluster's pre-vote failed due to offline node.
p_wn("2-node cluster's pre-vote is failing long time, "
"adjust quorum to 1");
ptr<raft_params> clone = cs_new<raft_params>(*params);
clone->custom_commit_quorum_size_ = 1;
clone->custom_election_quorum_size_ = 1;
ctx_->set_params(clone);
}

hb_alive_ = false;
leader_ = -1;
pre_vote_.reset(state_->get_term());
Expand Down
2 changes: 2 additions & 0 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ void raft_server::become_leader() {
next_leader_candidate_ = -1;
initialized_ = true;
pre_vote_.quorum_reject_count_ = 0;
pre_vote_.failure_count_ = 0;
data_fresh_ = true;

request_append_entries();
Expand Down Expand Up @@ -1035,6 +1036,7 @@ void raft_server::become_follower() {
initialized_ = true;
uncommitted_config_.reset();
pre_vote_.quorum_reject_count_ = 0;
pre_vote_.failure_count_ = 0;

// Drain all pending callback functions.
drop_all_pending_commit_elems();
Expand Down
216 changes: 216 additions & 0 deletions tests/unit/asio_service_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,216 @@ int async_append_handler_test() {
return 0;
}

int auto_quorum_size_test() {
reset_log_files();

std::string s1_addr = "tcp://127.0.0.1:20010";
std::string s2_addr = "tcp://127.0.0.1:20020";

RaftAsioPkg s1(1, s1_addr);
std::shared_ptr<RaftAsioPkg> s2 = std::make_shared<RaftAsioPkg>(2, s2_addr);
std::vector<RaftAsioPkg*> pkgs = {&s1, s2.get()};

_msg("launching asio-raft servers\n");
CHK_Z( launch_servers(pkgs, false) );

_msg("organizing raft group\n");
CHK_Z( make_group(pkgs) );
TestSuite::sleep_sec(1, "wait for Raft group ready");

// Set custom term counter, and enable auto quorum size mode.
auto custom_inc_term = [](uint64_t cur_term) -> uint64_t {
return (cur_term / 10) + 10;
};
s1.raftServer->set_inc_term_func(custom_inc_term);
s2->raftServer->set_inc_term_func(custom_inc_term);

raft_params params = s1.raftServer->get_current_params();
params.auto_adjust_quorum_for_small_cluster_ = true;
s1.raftServer->update_params(params);
s2->raftServer->update_params(params);

CHK_TRUE( s1.raftServer->is_leader() );
CHK_EQ(1, s1.raftServer->get_leader());
CHK_EQ(1, s2->raftServer->get_leader());

// Replication.
for (size_t ii=0; ii<10; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s1.raftServer->append_entries( {msg} );
}
TestSuite::sleep_sec(1, "wait for replication");
uint64_t committed_idx = s1.raftServer->get_committed_log_idx();

// State machine should be identical.
CHK_OK( s2->getTestSm()->isSame( *s1.getTestSm() ) );

// Shutdown S2.
s2->raftServer->shutdown();
s2.reset();

TestSuite::sleep_ms( RaftAsioPkg::HEARTBEAT_MS * 30,
"wait for quorum adjust" );

// More replication.
for (size_t ii=10; ii<11; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s1.raftServer->append_entries( {msg} );
}

// Replication should succeed: committed index should be moved forward.
TestSuite::sleep_sec(1, "wait for replication");
CHK_EQ( committed_idx + 1,
s1.raftServer->get_committed_log_idx() );

// Restart S2.
_msg("launching S2 again\n");
RaftAsioPkg s2_new(2, s2_addr);
CHK_Z( launch_servers({&s2_new}, false) );
TestSuite::sleep_sec(1, "wait for S2 ready");
CHK_EQ( committed_idx + 1,
s2_new.raftServer->get_committed_log_idx() );

// More replication.
for (size_t ii=11; ii<12; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s1.raftServer->append_entries( {msg} );
}

// Both of them should have the same commit number.
TestSuite::sleep_sec(1, "wait for replication");
CHK_EQ( committed_idx + 2,
s1.raftServer->get_committed_log_idx() );
CHK_EQ( committed_idx + 2,
s2_new.raftServer->get_committed_log_idx() );

s1.raftServer->shutdown();
s2_new.raftServer->shutdown();
TestSuite::sleep_sec(1, "shutting down");

SimpleLogger::shutdown();
return 0;
}

int auto_quorum_size_election_test() {
reset_log_files();

std::string s1_addr = "tcp://127.0.0.1:20010";
std::string s2_addr = "tcp://127.0.0.1:20020";

std::shared_ptr<RaftAsioPkg> s1 = std::make_shared<RaftAsioPkg>(1, s1_addr);
std::shared_ptr<RaftAsioPkg> s2 = std::make_shared<RaftAsioPkg>(2, s2_addr);
std::vector<RaftAsioPkg*> pkgs = {s1.get(), s2.get()};

_msg("launching asio-raft servers\n");
CHK_Z( launch_servers(pkgs, false) );

_msg("organizing raft group\n");
CHK_Z( make_group(pkgs) );
TestSuite::sleep_sec(1, "wait for Raft group ready");

// Set custom term counter, and enable auto quorum size mode.
auto custom_inc_term = [](uint64_t cur_term) -> uint64_t {
return (cur_term / 10) + 10;
};
s1->raftServer->set_inc_term_func(custom_inc_term);
s2->raftServer->set_inc_term_func(custom_inc_term);

raft_params params = s1->raftServer->get_current_params();
params.auto_adjust_quorum_for_small_cluster_ = true;
s1->raftServer->update_params(params);
s2->raftServer->update_params(params);

CHK_TRUE( s1->raftServer->is_leader() );
CHK_EQ(1, s1->raftServer->get_leader());
CHK_EQ(1, s2->raftServer->get_leader());

// Replication.
for (size_t ii=0; ii<10; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s1->raftServer->append_entries( {msg} );
}
TestSuite::sleep_sec(1, "wait for replication");

// State machine should be identical.
CHK_OK( s2->getTestSm()->isSame( *s1->getTestSm() ) );

// Shutdown S1.
s1->raftServer->shutdown();
s1.reset();

// Wait for adjust quorum and self election.
TestSuite::sleep_ms( RaftAsioPkg::HEARTBEAT_MS * 50,
"wait for quorum adjust" );

// S2 should be a leader.
CHK_TRUE( s2->raftServer->is_leader() );
CHK_EQ(2, s2->raftServer->get_leader());
uint64_t committed_idx = s2->raftServer->get_committed_log_idx();

// More replication.
for (size_t ii=10; ii<11; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s2->raftServer->append_entries( {msg} );
}

// Replication should succeed: committed index should be moved forward.
TestSuite::sleep_sec(1, "wait for replication");
CHK_EQ( committed_idx + 1,
s2->raftServer->get_committed_log_idx() );

// Restart S1.
_msg("launching S1 again\n");
RaftAsioPkg s1_new(1, s1_addr);
CHK_Z( launch_servers({&s1_new}, false) );
TestSuite::sleep_sec(1, "wait for S2 ready");
CHK_EQ( committed_idx + 1,
s1_new.raftServer->get_committed_log_idx() );

// S2 should remain as a leader.
CHK_TRUE( s2->raftServer->is_leader() );
CHK_EQ(2, s1_new.raftServer->get_leader());
CHK_EQ(2, s2->raftServer->get_leader());

// More replication.
for (size_t ii=11; ii<12; ++ii) {
std::string msg_str = std::to_string(ii);
ptr<buffer> msg = buffer::alloc(sizeof(uint32_t) + msg_str.size());
buffer_serializer bs(msg);
bs.put_str(msg_str);
s2->raftServer->append_entries( {msg} );
}

// Both of them should have the same commit number.
TestSuite::sleep_sec(1, "wait for replication");
CHK_EQ( committed_idx + 2,
s1_new.raftServer->get_committed_log_idx() );
CHK_EQ( committed_idx + 2,
s2->raftServer->get_committed_log_idx() );

s2->raftServer->shutdown();
s1_new.raftServer->shutdown();
TestSuite::sleep_sec(1, "shutting down");

SimpleLogger::shutdown();
return 0;
}

} // namespace asio_service_test;
using namespace asio_service_test;

Expand Down Expand Up @@ -788,6 +998,12 @@ int main(int argc, char** argv) {
ts.doTest( "async append handler test",
async_append_handler_test );

ts.doTest( "auto quorum size test",
auto_quorum_size_test );

ts.doTest( "auto quorum size for election test",
auto_quorum_size_election_test );

#ifdef ENABLE_RAFT_STATS
_msg("raft stats: ENABLED\n");
#else
Expand Down