Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add rate limiter for per disk when nfs copy data #944

Merged
merged 17 commits into from
Oct 27, 2021
10 changes: 8 additions & 2 deletions include/dsn/dist/nfs_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ namespace dsn {
struct remote_copy_request
{
dsn::rpc_address source;
std::string source_disk_tag;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
std::string source_dir;
std::vector<std::string> files;
std::string dest_disk_tag;
std::string dest_dir;
bool overwrite;
bool high_priority;
Expand All @@ -51,18 +53,22 @@ class nfs_node
static std::unique_ptr<nfs_node> create();

public:
aio_task_ptr copy_remote_directory(rpc_address remote,
aio_task_ptr copy_remote_directory(const rpc_address &remote,
const std::string &source_disk_tag,
const std::string &source_dir,
const std::string &dest_disk_tag,
const std::string &dest_dir,
bool overwrite,
bool high_priority,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0);
aio_task_ptr copy_remote_files(rpc_address remote,
aio_task_ptr copy_remote_files(const rpc_address &remote,
const std::string &source_disk_tag,
const std::string &source_dir,
const std::vector<std::string> &files, // empty for all
const std::string &dest_disk_tag,
const std::string &dest_dir,
bool overwrite,
bool high_priority,
Expand Down
1 change: 1 addition & 0 deletions src/common/consensus.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ struct learn_response
6:learn_state state; // learning data, including memory data and files
7:dsn.rpc_address address; // learnee's address
8:string base_local_dir; // base dir of files on learnee
9:optional string replica_disk_tag; // the disk tag of learnee located
}

struct learn_notify_response
Expand Down
3 changes: 3 additions & 0 deletions src/nfs/nfs.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ struct copy_request
6: i32 size;
7: bool is_last;
8: bool overwrite;
9: optional string source_disk_tag;
}

struct copy_response
Expand All @@ -29,6 +30,8 @@ struct get_file_size_request
3: list<string> file_list;
4: string source_dir;
5: bool overwrite;
6: optional string source_disk_tag;
7: optional string dest_disk_tag;
}

struct get_file_size_response
Expand Down
44 changes: 23 additions & 21 deletions src/nfs/nfs_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ DSN_DEFINE_uint32("nfs",
nfs_copy_block_bytes,
4 * 1024 * 1024,
"max block size (bytes) for each network copy");
DSN_DEFINE_int32("nfs", max_copy_rate_megabytes, 500, "max rate of copying from remote node(MB/s)");
DSN_DEFINE_int32("nfs",
max_copy_rate_megabytes_per_disk,
500,
"max rate per disk of copying from remote node(MB/s)");
DSN_TAG_VARIABLE(max_copy_rate_megabytes_per_disk, FT_MUTABLE);

DSN_DEFINE_int32("nfs",
max_concurrent_remote_copy_requests,
50,
Expand Down Expand Up @@ -105,13 +110,11 @@ nfs_client_impl::nfs_client_impl()
COUNTER_TYPE_VOLATILE_NUMBER,
"nfs client write fail count count in the recent period");

uint32_t max_copy_rate_bytes = FLAGS_max_copy_rate_megabytes << 20;
// max_copy_rate_bytes should be greater than nfs_copy_block_bytes which is the max batch copy
// size once
dassert(max_copy_rate_bytes > FLAGS_nfs_copy_block_bytes,
dassert((FLAGS_max_copy_rate_megabytes_per_disk << 20) > FLAGS_nfs_copy_block_bytes,
"max_copy_rate_bytes should be greater than nfs_copy_block_bytes");
_copy_token_bucket.reset(new TokenBucket(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes));
current_max_copy_rate_megabytes = FLAGS_max_copy_rate_megabytes;
_copy_token_buckets = std::make_unique<utils::token_buckets>();

register_cli_commands();
}
Expand All @@ -132,6 +135,8 @@ void nfs_client_impl::begin_remote_copy(std::shared_ptr<remote_copy_request> &rc
req->file_size_req.file_list = rci->files;
req->file_size_req.source_dir = rci->source_dir;
req->file_size_req.overwrite = rci->overwrite;
req->file_size_req.__set_source_disk_tag(rci->source_disk_tag);
req->file_size_req.__set_dest_disk_tag(rci->dest_disk_tag);
req->nfs_task = nfs_task;
req->is_finished = false;

Expand Down Expand Up @@ -269,8 +274,11 @@ void nfs_client_impl::continue_copy()
zauto_lock l(req->lock);
const user_request_ptr &ureq = req->file_ctx->user_req;
if (req->is_valid) {
// todo(jiashuo1) use non-block api `consumeWithBorrowNonBlocking` or `consume`
_copy_token_bucket->consumeWithBorrowAndWait(req->size);
_copy_token_buckets->get_token_bucket(ureq->file_size_req.dest_disk_tag)
->consumeWithBorrowAndWait(req->size,
FLAGS_max_copy_rate_megabytes_per_disk << 20,
1.5 *
(FLAGS_max_copy_rate_megabytes_per_disk << 20));

copy_request copy_req;
copy_req.source = ureq->file_size_req.source;
Expand All @@ -281,6 +289,7 @@ void nfs_client_impl::continue_copy()
copy_req.source_dir = ureq->file_size_req.source_dir;
copy_req.overwrite = ureq->file_size_req.overwrite;
copy_req.is_last = req->is_last;
copy_req.__set_source_disk_tag(ureq->file_size_req.source_disk_tag);
req->remote_copy_task =
async_nfs_copy(copy_req,
[=](error_code err, copy_response &&resp) {
Expand Down Expand Up @@ -556,27 +565,21 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code
req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0);
}

// todo(jiashuo1) just for compatibility with scripts, such as
// https://github.com/apache/incubator-pegasus/blob/v2.3/scripts/pegasus_offline_node_list.sh
void nfs_client_impl::register_cli_commands()
{

static std::once_flag flag;
std::call_once(flag, [&]() {
_nfs_max_copy_rate_megabytes_cmd = dsn::command_manager::instance().register_command(
{"nfs.max_copy_rate_megabytes"},
"nfs.max_copy_rate_megabytes [num | DEFAULT]",
"control the max rate(MB/s) to copy file from remote node",
[this](const std::vector<std::string> &args) {
"nfs.max_copy_rate_megabytes_per_disk [num]",
"control the max rate(MB/s) for one disk to copy file from remote node",
[](const std::vector<std::string> &args) {
std::string result("OK");

if (args.empty()) {
return std::to_string(current_max_copy_rate_megabytes);
}

if (args[0] == "DEFAULT") {
uint32_t max_copy_rate_bytes = FLAGS_max_copy_rate_megabytes << 20;
_copy_token_bucket->reset(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes);
current_max_copy_rate_megabytes = FLAGS_max_copy_rate_megabytes;
return result;
return std::to_string(FLAGS_max_copy_rate_megabytes_per_disk);
}

int32_t max_copy_rate_megabytes = 0;
Expand All @@ -592,8 +595,7 @@ void nfs_client_impl::register_cli_commands()
.append(std::to_string(FLAGS_nfs_copy_block_bytes));
return result;
}
_copy_token_bucket->reset(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes);
current_max_copy_rate_megabytes = max_copy_rate_megabytes;
FLAGS_max_copy_rate_megabytes_per_disk = max_copy_rate_megabytes;
return result;
});
});
Expand Down
4 changes: 3 additions & 1 deletion src/nfs/nfs_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <dsn/utility/TokenBucket.h>
#include <dsn/utility/flags.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/utils/token_buckets.h>

#include "nfs_types.h"
#include "nfs_code_definition.h"
Expand Down Expand Up @@ -275,7 +276,8 @@ class nfs_client_impl
void register_cli_commands();

private:
std::unique_ptr<folly::TokenBucket> _copy_token_bucket; // rate limiter of copy from remote
std::unique_ptr<dsn::utils::token_buckets>
_copy_token_buckets; // rate limiter of copy from remote

std::atomic<int> _concurrent_copy_request_count; // record concurrent request count, limited
// by max_concurrent_remote_copy_requests.
Expand Down
12 changes: 10 additions & 2 deletions src/nfs/nfs_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ std::unique_ptr<nfs_node> nfs_node::create()
return dsn::make_unique<dsn::service::nfs_node_simple>();
}

aio_task_ptr nfs_node::copy_remote_directory(rpc_address remote,
aio_task_ptr nfs_node::copy_remote_directory(const rpc_address &remote,
const std::string &source_disk_tag,
const std::string &source_dir,
const std::string &dest_disk_tag,
const std::string &dest_dir,
bool overwrite,
bool high_priority,
Expand All @@ -22,8 +24,10 @@ aio_task_ptr nfs_node::copy_remote_directory(rpc_address remote,
int hash)
{
return copy_remote_files(remote,
source_disk_tag,
source_dir,
{},
dest_disk_tag,
dest_dir,
overwrite,
high_priority,
Expand All @@ -33,9 +37,11 @@ aio_task_ptr nfs_node::copy_remote_directory(rpc_address remote,
hash);
}

aio_task_ptr nfs_node::copy_remote_files(rpc_address remote,
aio_task_ptr nfs_node::copy_remote_files(const rpc_address &remote,
const std::string &source_disk_tag,
const std::string &source_dir,
const std::vector<std::string> &files,
const std::string &dest_disk_tag,
const std::string &dest_dir,
bool overwrite,
bool high_priority,
Expand All @@ -48,8 +54,10 @@ aio_task_ptr nfs_node::copy_remote_files(rpc_address remote,

std::shared_ptr<remote_copy_request> rci = std::make_shared<remote_copy_request>();
rci->source = remote;
rci->source_disk_tag = source_disk_tag;
rci->source_dir = source_dir;
rci->files = files;
rci->dest_disk_tag = dest_disk_tag;
rci->dest_dir = dest_dir;
rci->overwrite = overwrite;
rci->high_priority = high_priority;
Expand Down
25 changes: 16 additions & 9 deletions src/nfs/nfs_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@
namespace dsn {
namespace service {

DSN_DEFINE_int32("nfs", max_send_rate_megabytes, 100, "max rate of send to remote node(MB/s)");
DSN_TAG_VARIABLE(max_send_rate_megabytes, FT_MUTABLE);
DSN_DEFINE_int32("nfs",
max_send_rate_megabytes_per_disk,
500,
"max rate per disk of send to remote node(MB/s)");
DSN_TAG_VARIABLE(max_send_rate_megabytes_per_disk, FT_MUTABLE);

DSN_DECLARE_int32(file_close_timer_interval_ms_on_server);
DSN_DECLARE_int32(file_close_expire_time_ms);
Expand All @@ -66,7 +69,7 @@ nfs_service_impl::nfs_service_impl() : ::dsn::serverlet<nfs_service_impl>("nfs")
COUNTER_TYPE_VOLATILE_NUMBER,
"nfs server copy fail count count in the recent period");

_send_token_bucket = std::make_unique<folly::DynamicTokenBucket>();
_send_token_buckets = std::make_unique<dsn::utils::token_buckets>();
register_cli_commands();
}

Expand Down Expand Up @@ -117,7 +120,8 @@ void nfs_service_impl::on_copy(const ::dsn::service::copy_request &request,

std::shared_ptr<callback_para> cp = std::make_shared<callback_para>(std::move(reply));
cp->bb = blob(dsn::utils::make_shared_array<char>(request.size), request.size);
cp->dst_dir = std::move(request.dst_dir);
cp->dst_dir = request.dst_dir;
cp->source_disk_tag = request.source_disk_tag;
cp->file_path = std::move(file_path);
cp->hfile = hfile;
cp->offset = request.offset;
Expand All @@ -137,8 +141,10 @@ void nfs_service_impl::on_copy(const ::dsn::service::copy_request &request,

void nfs_service_impl::internal_read_callback(error_code err, size_t sz, callback_para &cp)
{
_send_token_bucket->consumeWithBorrowAndWait(
sz, FLAGS_max_send_rate_megabytes << 20, 1.5 * (FLAGS_max_send_rate_megabytes << 20));
_send_token_buckets->get_token_bucket(cp.source_disk_tag)
->consumeWithBorrowAndWait(sz,
FLAGS_max_send_rate_megabytes_per_disk << 20,
1.5 * (FLAGS_max_send_rate_megabytes_per_disk << 20));
{
zauto_lock l(_handles_map_lock);
auto it = _handles_map.find(cp.file_path);
Expand Down Expand Up @@ -250,7 +256,8 @@ void nfs_service_impl::close_file() // release out-of-date file handle
}
}

// TODO(jiashuo1): just for compatibility, ready to delete it later
// TODO(jiashuo1): just for compatibility with scripts, such as
// https://github.com/apache/incubator-pegasus/blob/v2.3/scripts/pegasus_offline_node_list.sh
void nfs_service_impl::register_cli_commands()
{
static std::once_flag flag;
Expand All @@ -263,7 +270,7 @@ void nfs_service_impl::register_cli_commands()
std::string result("OK");

if (args.empty()) {
return std::to_string(FLAGS_max_send_rate_megabytes);
return std::to_string(FLAGS_max_send_rate_megabytes_per_disk);
}

int32_t max_send_rate_megabytes = 0;
Expand All @@ -272,7 +279,7 @@ void nfs_service_impl::register_cli_commands()
return std::string("ERR: invalid arguments");
}

FLAGS_max_send_rate_megabytes = max_send_rate_megabytes;
FLAGS_max_send_rate_megabytes_per_disk = max_send_rate_megabytes;
return result;
});
});
Expand Down
5 changes: 4 additions & 1 deletion src/nfs/nfs_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <dsn/cpp/serverlet.h>
#include <dsn/utility/flags.h>
#include <dsn/tool-api/command_manager.h>
#include <dsn/utils/token_buckets.h>

#include "nfs_code_definition.h"
#include "nfs_types.h"
Expand Down Expand Up @@ -71,6 +72,7 @@ class nfs_service_impl : public ::dsn::serverlet<nfs_service_impl>
struct callback_para
{
dsn_handle_t hfile;
std::string source_disk_tag;
std::string file_path;
std::string dst_dir;
blob bb;
Expand Down Expand Up @@ -126,7 +128,8 @@ class nfs_service_impl : public ::dsn::serverlet<nfs_service_impl>

::dsn::task_ptr _file_close_timer;

std::unique_ptr<folly::DynamicTokenBucket> _send_token_bucket; // rate limiter of send to remote
std::unique_ptr<dsn::utils::token_buckets>
_send_token_buckets; // rate limiter of send to remote

perf_counter_wrapper _recent_copy_data_size;
perf_counter_wrapper _recent_copy_fail_count;
Expand Down
6 changes: 6 additions & 0 deletions src/nfs/test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ TEST(nfs, basic)

aio_result r;
dsn::aio_task_ptr t = nfs->copy_remote_files(dsn::rpc_address("localhost", 20101),
"default",
".",
files,
"default",
"nfs_test_dir",
false,
false,
Expand Down Expand Up @@ -77,8 +79,10 @@ TEST(nfs, basic)

aio_result r;
dsn::aio_task_ptr t = nfs->copy_remote_files(dsn::rpc_address("localhost", 20101),
"default",
".",
files,
"default",
"nfs_test_dir",
true,
false,
Expand Down Expand Up @@ -106,7 +110,9 @@ TEST(nfs, basic)

aio_result r;
dsn::aio_task_ptr t = nfs->copy_remote_directory(dsn::rpc_address("localhost", 20101),
"default",
"nfs_test_dir",
"default",
"nfs_test_dir_copy",
false,
false,
Expand Down
9 changes: 9 additions & 0 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ void replica::init_state()
_last_config_change_time_ms = _create_time_ms;
update_last_checkpoint_generate_time();
_private_log = nullptr;
init_disk_tag();
}

replica::~replica(void)
Expand Down Expand Up @@ -579,5 +580,13 @@ uint32_t replica::query_data_version() const
return _app->query_data_version();
}

void replica::init_disk_tag()
{
dsn::error_code err = _stub->_fs_manager.get_disk_tag(dir(), _disk_tag);
if (dsn::ERR_OK != err) {
derror_replica("get disk tag of %s failed: %s, init it to empty ", dir(), err);
}
}

} // namespace replication
} // namespace dsn
Loading