diff --git a/src/dist/nfs/nfs_client_impl.cpp b/src/dist/nfs/nfs_client_impl.cpp index 1a35847ee0..88f0dcdfa7 100644 --- a/src/dist/nfs/nfs_client_impl.cpp +++ b/src/dist/nfs/nfs_client_impl.cpp @@ -34,11 +34,14 @@ */ #include #include +#include #include "nfs_client_impl.h" namespace dsn { namespace service { +DSN_DEFINE_int32("nfs", max_copy_megabytes_rate, 500, "max rate of copying from remote node(MB/s)"); + nfs_client_impl::nfs_client_impl(nfs_opts &opts) : _opts(opts), _concurrent_copy_request_count(0), @@ -65,8 +68,20 @@ nfs_client_impl::nfs_client_impl(nfs_opts &opts) "recent_write_fail_count", COUNTER_TYPE_VOLATILE_NUMBER, "nfs client write fail count count in the recent period"); + + uint32_t max_copy_bytes_rate = FLAGS_max_copy_megabytes_rate << 20; + // max_copy_bytes_rate should be greater than nfs_copy_block_bytes which is the max batch copy + // size once + dassert(max_copy_bytes_rate > _opts.nfs_copy_block_bytes, + "max_copy_bytes_rate should be greater than nfs_copy_block_bytes"); + _copy_token_bucket.reset(new TokenBucket(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate)); + _opts.max_copy_megabytes_rate = FLAGS_max_copy_megabytes_rate; + + register_cli_commands(); } +nfs_client_impl::~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); } + void nfs_client_impl::begin_remote_copy(std::shared_ptr &rci, aio_task *nfs_task) { @@ -217,6 +232,8 @@ void nfs_client_impl::continue_copy() zauto_lock l(req->lock); const user_request_ptr &ureq = req->file_ctx->user_req; if (req->is_valid) { + _copy_token_bucket->consumeWithBorrowAndWait(req->size); + copy_request copy_req; copy_req.source = ureq->file_size_req.source; copy_req.file_name = req->file_ctx->file_name; @@ -502,5 +519,43 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code // notify aio_task req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0); } + +void nfs_client_impl::register_cli_commands() +{ + dsn::command_manager::instance().register_app_command( + {"nfs.max_copy_megabytes_rate"}, + "nfs.max_copy_megabytes_rate [num | DEFAULT]", + "control the max rate(MB/s) to copy file from remote node", + [this](const std::vector &args) { + std::string result("OK"); + + if (args.empty()) { + return std::to_string(_opts.max_copy_megabytes_rate); + } + + if (args[0] == "DEFAULT") { + uint32_t max_copy_bytes_rate = FLAGS_max_copy_megabytes_rate << 20; + _copy_token_bucket->reset(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate); + _opts.max_copy_megabytes_rate = FLAGS_max_copy_megabytes_rate; + return result; + } + + int32_t max_copy_megabytes_rate = 0; + if (!dsn::buf2int32(args[0], max_copy_megabytes_rate) || max_copy_megabytes_rate <= 0) { + return std::string("ERR: invalid arguments"); + } + + uint32_t max_copy_bytes_rate = max_copy_megabytes_rate << 20; + if (max_copy_bytes_rate <= _opts.nfs_copy_block_bytes) { + result = std::string("ERR: max_copy_bytes_rate(max_copy_megabytes_rate << 20) " + "should be greater than nfs_copy_block_bytes:") + .append(std::to_string(_opts.nfs_copy_block_bytes)); + return result; + } + _copy_token_bucket->reset(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate); + _opts.max_copy_megabytes_rate = max_copy_megabytes_rate; + return result; + }); +} } } diff --git a/src/dist/nfs/nfs_client_impl.h b/src/dist/nfs/nfs_client_impl.h index 852a111a4c..d7c796b489 100644 --- a/src/dist/nfs/nfs_client_impl.h +++ b/src/dist/nfs/nfs_client_impl.h @@ -39,15 +39,20 @@ #include #include #include - +#include +#include +#include #include "nfs_client.h" namespace dsn { namespace service { +using TokenBucket = folly::BasicTokenBucket; + struct nfs_opts { uint32_t nfs_copy_block_bytes; + uint32_t max_copy_megabytes_rate; int max_concurrent_remote_copy_requests; int max_concurrent_local_writes; int max_buffered_local_writes; @@ -288,7 +293,7 @@ class nfs_client_impl : public ::dsn::service::nfs_client public: nfs_client_impl(nfs_opts &opts); - virtual ~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); } + virtual ~nfs_client_impl(); // copy file request entry void begin_remote_copy(std::shared_ptr &rci, aio_task *nfs_task); @@ -309,9 +314,13 @@ class nfs_client_impl : public ::dsn::service::nfs_client void handle_completion(const user_request_ptr &req, error_code err); + void register_cli_commands(); + private: nfs_opts &_opts; + std::unique_ptr _copy_token_bucket; // rate limiter of copy from remote + std::atomic _concurrent_copy_request_count; // record concurrent request count, limited // by max_concurrent_remote_copy_requests. std::atomic _concurrent_local_write_count; // record concurrent write count, limited