Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flying_bytes for throttling. #542

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ limitations under the License.
#include "srv_config.hxx"

#include <atomic>
#include <cassert>

namespace nuraft {

Expand Down Expand Up @@ -78,6 +79,7 @@ public:
, rsv_msg_(nullptr)
, rsv_msg_handler_(nullptr)
, last_streamed_log_idx_(0)
, bytes_in_flight_(0)
, l_(logger)
{
reset_ls_timer();
Expand Down Expand Up @@ -317,6 +319,23 @@ public:
last_streamed_log_idx_.store(0);
}

int64_t get_bytes_in_flight() {
return bytes_in_flight_.load();
}

void bytes_in_flight_add(size_t req_size_bytes) {
bytes_in_flight_.fetch_add(req_size_bytes);
}

void bytes_in_flight_sub(size_t req_size_bytes) {
bytes_in_flight_.fetch_sub(req_size_bytes);
assert(bytes_in_flight_ >= 0);
}

void reset_bytes_in_flight() {
bytes_in_flight_.store(0);
}

void try_set_free(msg_type type, bool streaming);

bool is_lost() const { return lost_by_leader_; }
Expand All @@ -329,6 +348,7 @@ private:
ptr<req_msg>& req,
ptr<rpc_result>& pending_result,
bool streaming,
size_t req_size_bytes,
ptr<resp_msg>& resp,
ptr<rpc_exception>& err);

Expand Down Expand Up @@ -541,6 +561,11 @@ private:
*/
std::atomic<ulong> last_streamed_log_idx_;

/**
* Current bytes of in-flight append entry requests.
*/
std::atomic<int64_t> bytes_in_flight_;

/**
* Logger instance.
*/
Expand Down
15 changes: 15 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct raft_params {
, use_full_consensus_among_healthy_members_(false)
, parallel_log_appending_(false)
, max_log_gap_in_stream_(0)
, max_bytes_in_flight_in_stream_(0)
{}

/**
Expand Down Expand Up @@ -606,7 +607,21 @@ public:
*/
bool parallel_log_appending_;

/**
* If non-zero, streaming mode is enabled and `append_entries` requests are
* dispatched instantly without awaiting the response from the prior request.
*,
* The count of logs in-flight will be capped by this value, allowing it
* to function as a throttling mechanism, in conjunction with
* `max_bytes_in_flight_in_stream_`.
*/
int32 max_log_gap_in_stream_;

/**
* If non-zero, the volume of data in-flight will be restricted to this
* specified byte limit. This limitation is effective only in streaming mode.
*/
int64_t max_bytes_in_flight_in_stream_;
};

}
Expand Down
54 changes: 30 additions & 24 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ bool raft_server::request_append_entries(ptr<peer> p) {
int32 max_gap_in_stream = params->max_log_gap_in_stream_;
if (last_streamed_log_idx > 0 && max_gap_in_stream == 0) {
p_in("disable stream mode for peer %d at runtime, "
"current streamed log: %" PRIu64 "", p->get_id(),
"current streamed log: %" PRIu64 "", p->get_id(),
last_streamed_log_idx);
last_streamed_log_idx = 0;
p->reset_stream();
Expand All @@ -301,22 +301,26 @@ bool raft_server::request_append_entries(ptr<peer> p) {
m_handler = resp_handler_;

if (msg) {
streaming = streaming &&
streaming = streaming &&
msg->get_type() == msg_type::append_entries_request;
bool make_busy_result = p->is_busy();
if (streaming) {
// throttling
if (max_gap_in_stream + p->get_next_log_idx() <=
(last_streamed_log_idx + 1)) {
p_db("flying log entry exceeds %d in stream mode, "
"skip this request", max_gap_in_stream);
int64_t max_stream_bytes =
params->max_bytes_in_flight_in_stream_ > 0
? params->max_bytes_in_flight_in_stream_ : 0;

if (max_gap_in_stream + p->get_next_log_idx() <=
(last_streamed_log_idx + 1) ||
(max_stream_bytes &&
p->get_bytes_in_flight() > max_stream_bytes)) {
streaming = false;
} else {
p_tr("send following request to %d in stream mode, "
"start idx: %" PRIu64 "", (int)p->get_id(),
p_tr("send following request to %d in stream mode, "
"start idx: %" PRIu64 "", (int)p->get_id(),
msg->get_last_log_idx());
p->set_last_streamed_log_idx(
last_streamed_log_idx,
last_streamed_log_idx,
last_streamed_log_idx + msg->log_entries().size());
}
} else if (!make_busy_result) {
Expand Down Expand Up @@ -350,9 +354,12 @@ bool raft_server::request_append_entries(ptr<peer> p) {
p->inc_long_pause_warnings();
if (p->get_long_puase_warnings() < raft_server::raft_limits_.warning_limit_) {
p_wn("skipped sending msg to %d too long time, "
"last streamed idx: %" PRIu64 ""
"next log idx: %" PRIu64 ""
"in-flight: %" PRIu64 " bytes"
"last msg sent %d ms ago",
p->get_id(), last_ts_ms);

p->get_id(), p->get_last_streamed_log_idx(),
p->get_next_log_idx(), p->get_bytes_in_flight(), last_ts_ms);
} else if ( p->get_long_puase_warnings() ==
raft_server::raft_limits_.warning_limit_ ) {
p_wn("long pause warning to %d is too verbose, "
Expand All @@ -362,10 +369,8 @@ bool raft_server::request_append_entries(ptr<peer> p) {
return false;
}



bool raft_server::send_request(ptr<peer>& p,
ptr<req_msg>& msg,
bool raft_server::send_request(ptr<peer>& p,
ptr<req_msg>& msg,
rpc_handler& m_handler,
bool streaming) {
if (!p->is_manual_free()) {
Expand Down Expand Up @@ -1049,7 +1054,8 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
(int)p->get_id(), resp.get_next_idx());

int64 bs_hint = resp.get_next_batch_size_hint_in_bytes();
p_tr("peer %d batch size hint: %" PRId64 " bytes", p->get_id(), bs_hint);
p_tr("peer %d batch size hint: %" PRId64 " bytes, in-flight: %" PRId64 " bytes",
p->get_id(), bs_hint, p->get_bytes_in_flight());
p->set_next_batch_size_hint_in_bytes(bs_hint);

if (resp.get_accepted()) {
Expand Down Expand Up @@ -1078,15 +1084,15 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
ulong last_streamed_log_idx = p->get_last_streamed_log_idx();
p_tr("peer %d, max gap: %d, acceptable_precommit_idx: %" PRIu64 ", "
"last_streamed_log_idx: %" PRIu64 ", "
"last_sent: %" PRIu64 ", next_idx: %" PRIu64 "", p->get_id(),
max_gap_in_stream, acceptable_precommit_idx, last_streamed_log_idx,
"last_sent: %" PRIu64 ", next_idx: %" PRIu64 "", p->get_id(),
max_gap_in_stream, acceptable_precommit_idx, last_streamed_log_idx,
p->get_last_sent_idx(), resp.get_next_idx());
if (max_gap_in_stream > 0 &&
last_streamed_log_idx == 0 &&
last_streamed_log_idx == 0 &&
resp.get_next_idx() > 0 &&
p->get_last_sent_idx() < resp.get_next_idx() &&
p->get_last_sent_idx() < resp.get_next_idx() &&
precommit_index_ < acceptable_precommit_idx) {
p_in("start stream mode for peer %d at idx: %" PRIu64 "",
p_in("start stream mode for peer %d at idx: %" PRIu64 "",
p->get_id(), resp.get_next_idx() - 1);
p->set_last_streamed_log_idx(0, resp.get_next_idx() - 1);
}
Expand All @@ -1097,8 +1103,8 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {

// As commit might send request, so refresh streamed log idx here
last_streamed_log_idx = p->get_last_streamed_log_idx();
ulong next_idx_to_send = last_streamed_log_idx
? last_streamed_log_idx + 1
ulong next_idx_to_send = last_streamed_log_idx
? last_streamed_log_idx + 1
: resp.get_next_idx();
need_to_catchup = p->clear_pending_commit() ||
next_idx_to_send < log_store_->next_slot();
Expand Down Expand Up @@ -1144,7 +1150,7 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
"resp next %" PRIu64 ", new next log idx %" PRIu64,
p->get_id(), prev_next_log,
resp.get_next_idx(), p->get_next_log_idx() );

// disable stream
p->reset_stream();
}
Expand Down
14 changes: 14 additions & 0 deletions src/peer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ void peer::send_req( ptr<peer> myself,
}
rpc_local = rpc_;
}

size_t req_size_bytes = 0;
if (req->get_type() == append_entries_request) {
for (auto& entry: req->log_entries()) {
req_size_bytes += entry->get_buf_ptr()->size();
}
}

rpc_handler h = (rpc_handler)std::bind
( &peer::handle_rpc_result,
this,
Expand All @@ -65,9 +73,11 @@ void peer::send_req( ptr<peer> myself,
req,
pending,
streaming,
req_size_bytes,
std::placeholders::_1,
std::placeholders::_2 );
if (rpc_local) {
myself->bytes_in_flight_add(req_size_bytes);
rpc_local->send(req, h);
}
}
Expand All @@ -83,6 +93,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
ptr<req_msg>& req,
ptr<rpc_result>& pending_result,
bool streaming,
size_t req_size_bytes,
ptr<resp_msg>& resp,
ptr<rpc_exception>& err )
{
Expand Down Expand Up @@ -119,6 +130,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
// WARNING:
// `set_free()` should be protected by `rpc_protector_`, otherwise
// it may free the peer even though new RPC client is already created.
bytes_in_flight_sub(req_size_bytes);
try_set_free(req->get_type(), streaming);
}
}
Expand Down Expand Up @@ -156,6 +168,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
if (cur_rpc_id == given_rpc_id) {
rpc_.reset();
reset_stream();
reset_bytes_in_flight();
try_set_free(req->get_type(), streaming);
} else {
// WARNING (MONSTOR-9378):
Expand Down Expand Up @@ -241,6 +254,7 @@ bool peer::recreate_rpc(ptr<srv_config>& config,
reset_active_timer();

reset_stream();
reset_bytes_in_flight();
set_free();
set_manual_free();
return true;
Expand Down
76 changes: 70 additions & 6 deletions tests/unit/stream_functional_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ void update_stream_params(const std::vector<RaftPkg*>& pkgs,
}
}

void update_max_fly_bytes_params(const std::vector<RaftPkg*>& pkgs,
int max_bytes_in_flight_in_stream) {
for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.max_bytes_in_flight_in_stream_ = max_bytes_in_flight_in_stream;
pp->raftServer->update_params(param);
}
}

int stream_basic_function_test() {
reset_log_files();

Expand Down Expand Up @@ -395,7 +405,7 @@ int activate_and_deactivate_stream_mode_test() {
return 0;
}

int stream_mode_throttling_test() {
int stream_mode_base_throttling_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

Expand Down Expand Up @@ -427,6 +437,7 @@ int stream_mode_throttling_test() {

// Send one more log, pending reqs should not increase
CHK_Z( append_log_in_stream_without_delivery(s1, s2_addr, 1, max_gap) );

// Need 2 * max_gap, because every append log need one commit request
int expected_delivery = 2 * max_gap + 1;
CHK_Z( drain_pending_reqs_queue(pkgs, s1, s2_addr,
Expand All @@ -442,6 +453,56 @@ int stream_mode_throttling_test() {
return 0;
}

int stream_mode_flying_bytes_throttling_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

std::string s1_addr = "S1";
std::string s2_addr = "S2";

RaftPkg s1(f_base, 1, s1_addr);
RaftPkg s2(f_base, 2, s2_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2};

CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );
for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
pp->raftServer->update_params(param);
}

// Set stream mode and max flying bytes params
// idx 0-9, size is 13, idx 10-99, size is 14
int max_bytes_in_flight = 13 * 9;
update_stream_params(pkgs, 500);
update_max_fly_bytes_params(pkgs, max_bytes_in_flight);

// Append 1 log to enable stream
CHK_Z( activate_stream(s1, s2_addr) );

// Append number of max gap logs in stream mode
CHK_Z( append_log_in_stream_without_delivery(s1, s2_addr, 10, 10) );

// Send one more log, pending reqs should not increase
CHK_Z( append_log_in_stream_without_delivery(s1, s2_addr, 1, 10) );

// Need 2 * max_gap, because every append log need one commit request
int expected_delivery = 2 * 10 + 1;
CHK_Z( drain_pending_reqs_queue(pkgs, s1, s2_addr,
expected_delivery, expected_delivery) );
CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) );

print_stats(pkgs);

s1.raftServer->shutdown();
s2.raftServer->shutdown();

f_base->destroy();
return 0;
}

int snapshot_transmission_in_stream_mode() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();
Expand Down Expand Up @@ -514,9 +575,10 @@ int snapshot_transmission_in_stream_mode() {
// There shouldn't be any open snapshot ctx.
CHK_Z( s1.getTestSm()->getNumOpenedUserCtxs() );

// Append two logs, it only generate 1 reqs for S3
// Append two logs, it only generate 1 reqs for S3
CHK_Z( append_log_in_stream_without_delivery(s1, s3_addr, 1, 1) );
CHK_Z( append_log_in_stream_without_delivery(s1, s3_addr, 1, 1) );

// Peer in stream mode will generate more requests, so drain S2.
// Enable stream mode for S3.
// 2 pending requests, 2 commit requests
Expand Down Expand Up @@ -563,10 +625,12 @@ int main(int argc, char** argv) {
ts.doTest( "activate and deactivate stream mode test",
activate_and_deactivate_stream_mode_test );

// Put some sleep in follower's code,
// and see if streaming throttling (max log gap) is working
ts.doTest( "stream mode throttling test",
stream_mode_throttling_test );
// Throttling test
ts.doTest( "stream mode base throttling test",
stream_mode_base_throttling_test );

ts.doTest( "stream mode flying bytes throttling test",
stream_mode_flying_bytes_throttling_test );

// Snapshot transmission in stream mode
ts.doTest( "snapshot transmission in stream mode",
Expand Down
Loading