Skip to content

Commit

Permalink
Add bytes in-flight for throttling. (#542)
Browse files Browse the repository at this point in the history
* 1. Add track for flying bytes
2. Add a new throttling flying bytes

* minor fix

* handle comments

* [Update PR] Format, comment, whitespace

---------

Co-authored-by: lihzeng <[email protected]>
Co-authored-by: Jung-Sang Ahn <[email protected]>
  • Loading branch information
3 people authored Oct 17, 2024
1 parent 70b7970 commit e530af3
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 30 deletions.
25 changes: 25 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ limitations under the License.
#include "srv_config.hxx"

#include <atomic>
#include <cassert>

namespace nuraft {

Expand Down Expand Up @@ -78,6 +79,7 @@ public:
, rsv_msg_(nullptr)
, rsv_msg_handler_(nullptr)
, last_streamed_log_idx_(0)
, bytes_in_flight_(0)
, l_(logger)
{
reset_ls_timer();
Expand Down Expand Up @@ -317,6 +319,23 @@ public:
last_streamed_log_idx_.store(0);
}

int64_t get_bytes_in_flight() {
return bytes_in_flight_.load();
}

void bytes_in_flight_add(size_t req_size_bytes) {
bytes_in_flight_.fetch_add(req_size_bytes);
}

void bytes_in_flight_sub(size_t req_size_bytes) {
bytes_in_flight_.fetch_sub(req_size_bytes);
assert(bytes_in_flight_ >= 0);
}

void reset_bytes_in_flight() {
bytes_in_flight_.store(0);
}

void try_set_free(msg_type type, bool streaming);

bool is_lost() const { return lost_by_leader_; }
Expand All @@ -329,6 +348,7 @@ private:
ptr<req_msg>& req,
ptr<rpc_result>& pending_result,
bool streaming,
size_t req_size_bytes,
ptr<resp_msg>& resp,
ptr<rpc_exception>& err);

Expand Down Expand Up @@ -541,6 +561,11 @@ private:
*/
std::atomic<ulong> last_streamed_log_idx_;

/**
* Current bytes of in-flight append entry requests.
*/
std::atomic<int64_t> bytes_in_flight_;

/**
* Logger instance.
*/
Expand Down
15 changes: 15 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct raft_params {
, use_full_consensus_among_healthy_members_(false)
, parallel_log_appending_(false)
, max_log_gap_in_stream_(0)
, max_bytes_in_flight_in_stream_(0)
{}

/**
Expand Down Expand Up @@ -606,7 +607,21 @@ public:
*/
bool parallel_log_appending_;

/**
* If non-zero, streaming mode is enabled and `append_entries` requests are
* dispatched instantly without awaiting the response from the prior request.
*,
* The count of logs in-flight will be capped by this value, allowing it
* to function as a throttling mechanism, in conjunction with
* `max_bytes_in_flight_in_stream_`.
*/
int32 max_log_gap_in_stream_;

/**
* If non-zero, the volume of data in-flight will be restricted to this
* specified byte limit. This limitation is effective only in streaming mode.
*/
int64_t max_bytes_in_flight_in_stream_;
};

}
Expand Down
54 changes: 30 additions & 24 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ bool raft_server::request_append_entries(ptr<peer> p) {
int32 max_gap_in_stream = params->max_log_gap_in_stream_;
if (last_streamed_log_idx > 0 && max_gap_in_stream == 0) {
p_in("disable stream mode for peer %d at runtime, "
"current streamed log: %" PRIu64 "", p->get_id(),
"current streamed log: %" PRIu64 "", p->get_id(),
last_streamed_log_idx);
last_streamed_log_idx = 0;
p->reset_stream();
Expand All @@ -301,22 +301,26 @@ bool raft_server::request_append_entries(ptr<peer> p) {
m_handler = resp_handler_;

if (msg) {
streaming = streaming &&
streaming = streaming &&
msg->get_type() == msg_type::append_entries_request;
bool make_busy_result = p->is_busy();
if (streaming) {
// throttling
if (max_gap_in_stream + p->get_next_log_idx() <=
(last_streamed_log_idx + 1)) {
p_db("flying log entry exceeds %d in stream mode, "
"skip this request", max_gap_in_stream);
int64_t max_stream_bytes =
params->max_bytes_in_flight_in_stream_ > 0
? params->max_bytes_in_flight_in_stream_ : 0;

if (max_gap_in_stream + p->get_next_log_idx() <=
(last_streamed_log_idx + 1) ||
(max_stream_bytes &&
p->get_bytes_in_flight() > max_stream_bytes)) {
streaming = false;
} else {
p_tr("send following request to %d in stream mode, "
"start idx: %" PRIu64 "", (int)p->get_id(),
p_tr("send following request to %d in stream mode, "
"start idx: %" PRIu64 "", (int)p->get_id(),
msg->get_last_log_idx());
p->set_last_streamed_log_idx(
last_streamed_log_idx,
last_streamed_log_idx,
last_streamed_log_idx + msg->log_entries().size());
}
} else if (!make_busy_result) {
Expand Down Expand Up @@ -350,9 +354,12 @@ bool raft_server::request_append_entries(ptr<peer> p) {
p->inc_long_pause_warnings();
if (p->get_long_puase_warnings() < raft_server::raft_limits_.warning_limit_) {
p_wn("skipped sending msg to %d too long time, "
"last streamed idx: %" PRIu64 ""
"next log idx: %" PRIu64 ""
"in-flight: %" PRIu64 " bytes"
"last msg sent %d ms ago",
p->get_id(), last_ts_ms);

p->get_id(), p->get_last_streamed_log_idx(),
p->get_next_log_idx(), p->get_bytes_in_flight(), last_ts_ms);
} else if ( p->get_long_puase_warnings() ==
raft_server::raft_limits_.warning_limit_ ) {
p_wn("long pause warning to %d is too verbose, "
Expand All @@ -362,10 +369,8 @@ bool raft_server::request_append_entries(ptr<peer> p) {
return false;
}



bool raft_server::send_request(ptr<peer>& p,
ptr<req_msg>& msg,
bool raft_server::send_request(ptr<peer>& p,
ptr<req_msg>& msg,
rpc_handler& m_handler,
bool streaming) {
if (!p->is_manual_free()) {
Expand Down Expand Up @@ -1049,7 +1054,8 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
(int)p->get_id(), resp.get_next_idx());

int64 bs_hint = resp.get_next_batch_size_hint_in_bytes();
p_tr("peer %d batch size hint: %" PRId64 " bytes", p->get_id(), bs_hint);
p_tr("peer %d batch size hint: %" PRId64 " bytes, in-flight: %" PRId64 " bytes",
p->get_id(), bs_hint, p->get_bytes_in_flight());
p->set_next_batch_size_hint_in_bytes(bs_hint);

if (resp.get_accepted()) {
Expand Down Expand Up @@ -1078,15 +1084,15 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
ulong last_streamed_log_idx = p->get_last_streamed_log_idx();
p_tr("peer %d, max gap: %d, acceptable_precommit_idx: %" PRIu64 ", "
"last_streamed_log_idx: %" PRIu64 ", "
"last_sent: %" PRIu64 ", next_idx: %" PRIu64 "", p->get_id(),
max_gap_in_stream, acceptable_precommit_idx, last_streamed_log_idx,
"last_sent: %" PRIu64 ", next_idx: %" PRIu64 "", p->get_id(),
max_gap_in_stream, acceptable_precommit_idx, last_streamed_log_idx,
p->get_last_sent_idx(), resp.get_next_idx());
if (max_gap_in_stream > 0 &&
last_streamed_log_idx == 0 &&
last_streamed_log_idx == 0 &&
resp.get_next_idx() > 0 &&
p->get_last_sent_idx() < resp.get_next_idx() &&
p->get_last_sent_idx() < resp.get_next_idx() &&
precommit_index_ < acceptable_precommit_idx) {
p_in("start stream mode for peer %d at idx: %" PRIu64 "",
p_in("start stream mode for peer %d at idx: %" PRIu64 "",
p->get_id(), resp.get_next_idx() - 1);
p->set_last_streamed_log_idx(0, resp.get_next_idx() - 1);
}
Expand All @@ -1097,8 +1103,8 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {

// As commit might send request, so refresh streamed log idx here
last_streamed_log_idx = p->get_last_streamed_log_idx();
ulong next_idx_to_send = last_streamed_log_idx
? last_streamed_log_idx + 1
ulong next_idx_to_send = last_streamed_log_idx
? last_streamed_log_idx + 1
: resp.get_next_idx();
need_to_catchup = p->clear_pending_commit() ||
next_idx_to_send < log_store_->next_slot();
Expand Down Expand Up @@ -1144,7 +1150,7 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
"resp next %" PRIu64 ", new next log idx %" PRIu64,
p->get_id(), prev_next_log,
resp.get_next_idx(), p->get_next_log_idx() );

// disable stream
p->reset_stream();
}
Expand Down
14 changes: 14 additions & 0 deletions src/peer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ void peer::send_req( ptr<peer> myself,
}
rpc_local = rpc_;
}

size_t req_size_bytes = 0;
if (req->get_type() == append_entries_request) {
for (auto& entry: req->log_entries()) {
req_size_bytes += entry->get_buf_ptr()->size();
}
}

rpc_handler h = (rpc_handler)std::bind
( &peer::handle_rpc_result,
this,
Expand All @@ -65,9 +73,11 @@ void peer::send_req( ptr<peer> myself,
req,
pending,
streaming,
req_size_bytes,
std::placeholders::_1,
std::placeholders::_2 );
if (rpc_local) {
myself->bytes_in_flight_add(req_size_bytes);
rpc_local->send(req, h);
}
}
Expand All @@ -83,6 +93,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
ptr<req_msg>& req,
ptr<rpc_result>& pending_result,
bool streaming,
size_t req_size_bytes,
ptr<resp_msg>& resp,
ptr<rpc_exception>& err )
{
Expand Down Expand Up @@ -119,6 +130,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
// WARNING:
// `set_free()` should be protected by `rpc_protector_`, otherwise
// it may free the peer even though new RPC client is already created.
bytes_in_flight_sub(req_size_bytes);
try_set_free(req->get_type(), streaming);
}
}
Expand Down Expand Up @@ -156,6 +168,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
if (cur_rpc_id == given_rpc_id) {
rpc_.reset();
reset_stream();
reset_bytes_in_flight();
try_set_free(req->get_type(), streaming);
} else {
// WARNING (MONSTOR-9378):
Expand Down Expand Up @@ -241,6 +254,7 @@ bool peer::recreate_rpc(ptr<srv_config>& config,
reset_active_timer();

reset_stream();
reset_bytes_in_flight();
set_free();
set_manual_free();
return true;
Expand Down
76 changes: 70 additions & 6 deletions tests/unit/stream_functional_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ void update_stream_params(const std::vector<RaftPkg*>& pkgs,
}
}

void update_max_fly_bytes_params(const std::vector<RaftPkg*>& pkgs,
int max_bytes_in_flight_in_stream) {
for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.max_bytes_in_flight_in_stream_ = max_bytes_in_flight_in_stream;
pp->raftServer->update_params(param);
}
}

int stream_basic_function_test() {
reset_log_files();

Expand Down Expand Up @@ -395,7 +405,7 @@ int activate_and_deactivate_stream_mode_test() {
return 0;
}

int stream_mode_throttling_test() {
int stream_mode_base_throttling_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

Expand Down Expand Up @@ -427,6 +437,7 @@ int stream_mode_throttling_test() {

// Send one more log, pending reqs should not increase
CHK_Z( append_log_in_stream_without_delivery(s1, s2_addr, 1, max_gap) );

// Need 2 * max_gap, because every append log need one commit request
int expected_delivery = 2 * max_gap + 1;
CHK_Z( drain_pending_reqs_queue(pkgs, s1, s2_addr,
Expand All @@ -442,6 +453,56 @@ int stream_mode_throttling_test() {
return 0;
}

int stream_mode_flying_bytes_throttling_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

std::string s1_addr = "S1";
std::string s2_addr = "S2";

RaftPkg s1(f_base, 1, s1_addr);
RaftPkg s2(f_base, 2, s2_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2};

CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );
for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
pp->raftServer->update_params(param);
}

// Set stream mode and max flying bytes params
// idx 0-9, size is 13, idx 10-99, size is 14
int max_bytes_in_flight = 13 * 9;
update_stream_params(pkgs, 500);
update_max_fly_bytes_params(pkgs, max_bytes_in_flight);

// Append 1 log to enable stream
CHK_Z( activate_stream(s1, s2_addr) );

// Append number of max gap logs in stream mode
CHK_Z( append_log_in_stream_without_delivery(s1, s2_addr, 10, 10) );

// Send one more log, pending reqs should not increase
CHK_Z( append_log_in_stream_without_delivery(s1, s2_addr, 1, 10) );

// Need 2 * max_gap, because every append log need one commit request
int expected_delivery = 2 * 10 + 1;
CHK_Z( drain_pending_reqs_queue(pkgs, s1, s2_addr,
expected_delivery, expected_delivery) );
CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) );

print_stats(pkgs);

s1.raftServer->shutdown();
s2.raftServer->shutdown();

f_base->destroy();
return 0;
}

int snapshot_transmission_in_stream_mode() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();
Expand Down Expand Up @@ -514,9 +575,10 @@ int snapshot_transmission_in_stream_mode() {
// There shouldn't be any open snapshot ctx.
CHK_Z( s1.getTestSm()->getNumOpenedUserCtxs() );

// Append two logs, it only generate 1 reqs for S3
// Append two logs, it only generate 1 reqs for S3
CHK_Z( append_log_in_stream_without_delivery(s1, s3_addr, 1, 1) );
CHK_Z( append_log_in_stream_without_delivery(s1, s3_addr, 1, 1) );

// Peer in stream mode will generate more requests, so drain S2.
// Enable stream mode for S3.
// 2 pending requests, 2 commit requests
Expand Down Expand Up @@ -563,10 +625,12 @@ int main(int argc, char** argv) {
ts.doTest( "activate and deactivate stream mode test",
activate_and_deactivate_stream_mode_test );

// Put some sleep in follower's code,
// and see if streaming throttling (max log gap) is working
ts.doTest( "stream mode throttling test",
stream_mode_throttling_test );
// Throttling test
ts.doTest( "stream mode base throttling test",
stream_mode_base_throttling_test );

ts.doTest( "stream mode flying bytes throttling test",
stream_mode_flying_bytes_throttling_test );

// Snapshot transmission in stream mode
ts.doTest( "snapshot transmission in stream mode",
Expand Down

0 comments on commit e530af3

Please sign in to comment.