From c8e1a8019a59e25d77fd71ac06f0cb27e417d739 Mon Sep 17 00:00:00 2001 From: Shuo Date: Tue, 19 May 2020 14:55:30 +0800 Subject: [PATCH] feat: add rate limit for learning and support remote-command (#461) --- src/dist/block_service/fds/fds_service.cpp | 23 ++++----- src/dist/nfs/nfs_client_impl.cpp | 55 ++++++++++++++++++++++ src/dist/nfs/nfs_client_impl.h | 13 ++++- 3 files changed, 75 insertions(+), 16 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 6c4dfd28ea..a70b73dcec 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -103,8 +103,6 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5"; fds_service::fds_service() { - const int BYTE_TO_BIT = 8; - /// In normal scenario, the sst file size of level 0 is write_buffer_size * [0.75, 1.25] /// And in BULK_LOAD scenario, it is 4 * write_buffer_size * [0.75, 1.25]. /// In rdsn, we can't get the scenario, so if we take BULK_LOAD scenario into consideration, @@ -113,30 +111,27 @@ fds_service::fds_service() uint64_t target_file_size = dsn_config_get_value_uint64("pegasus.server", "rocksdb_target_file_size_base", - 64 * 1024 * 1024, + 64 << 20, "rocksdb options.target_file_size_base"); uint64_t write_buffer_size = dsn_config_get_value_uint64("pegasus.server", "rocksdb_write_buffer_size", - 64 * 1024 * 1024, + 64 << 20, "rocksdb options.write_buffer_size"); uint64_t max_sst_file_size = std::max(target_file_size, (uint64_t)1.25 * write_buffer_size); uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64( - "replication", "fds_write_limit_rate", 20, "rate limit of fds(Mb/s)"); + "replication", "fds_write_limit_rate", 100, "rate limit of fds(MB/s)"); /// For write operation, we can't send a file in batches. Because putContent interface of fds /// will overwrite what was sent before for the same file. So we must send a file as a whole. /// If file size > burst size, the file will be rejected by the token bucket. /// Here we set burst_size = max_sst_file_size + 3MB, a litte greater than max_sst_file_size - uint32_t burst_size = - std::max(2 * write_rate_limit * 1e6 / BYTE_TO_BIT, max_sst_file_size + 3e6); - _write_token_bucket.reset( - new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); + uint32_t burst_size = std::max(2.0 * (write_rate_limit << 20), max_sst_file_size + 3e6); + _write_token_bucket.reset(new folly::TokenBucket(write_rate_limit << 20, burst_size)); uint32_t read_rate_limit = (uint32_t)dsn_config_get_value_uint64( - "replication", "fds_read_limit_rate", 20, "rate limit of fds(Mb/s)"); - burst_size = 2 * read_rate_limit * 1e6 / BYTE_TO_BIT; - _read_token_bucket.reset( - new folly::TokenBucket(read_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); + "replication", "fds_read_limit_rate", 100, "rate limit of fds(MB/s)"); + burst_size = 2 * read_rate_limit << 20; + _read_token_bucket.reset(new folly::TokenBucket(read_rate_limit << 20, burst_size)); } fds_service::~fds_service() {} @@ -569,7 +564,7 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, /*out*/ uint64_t &transfered_bytes) { // the max batch size is 1MB - const uint64_t BATCH_MAX = 1e6; + const uint64_t BATCH_MAX = 1 << 20; error_code err = ERR_OK; transfered_bytes = 0; diff --git a/src/dist/nfs/nfs_client_impl.cpp b/src/dist/nfs/nfs_client_impl.cpp index 1a35847ee0..5e9ff15457 100644 --- a/src/dist/nfs/nfs_client_impl.cpp +++ b/src/dist/nfs/nfs_client_impl.cpp @@ -34,11 +34,14 @@ */ #include #include +#include #include "nfs_client_impl.h" namespace dsn { namespace service { +DSN_DEFINE_int32("nfs", max_copy_rate_megabytes, 500, "max rate of copying from remote node(MB/s)"); + nfs_client_impl::nfs_client_impl(nfs_opts &opts) : _opts(opts), _concurrent_copy_request_count(0), @@ -65,8 +68,20 @@ nfs_client_impl::nfs_client_impl(nfs_opts &opts) "recent_write_fail_count", COUNTER_TYPE_VOLATILE_NUMBER, "nfs client write fail count count in the recent period"); + + uint32_t max_copy_rate_bytes = FLAGS_max_copy_rate_megabytes << 20; + // max_copy_rate_bytes should be greater than nfs_copy_block_bytes which is the max batch copy + // size once + dassert(max_copy_rate_bytes > _opts.nfs_copy_block_bytes, + "max_copy_rate_bytes should be greater than nfs_copy_block_bytes"); + _copy_token_bucket.reset(new TokenBucket(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes)); + _opts.max_copy_rate_megabytes = FLAGS_max_copy_rate_megabytes; + + register_cli_commands(); } +nfs_client_impl::~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); } + void nfs_client_impl::begin_remote_copy(std::shared_ptr &rci, aio_task *nfs_task) { @@ -217,6 +232,8 @@ void nfs_client_impl::continue_copy() zauto_lock l(req->lock); const user_request_ptr &ureq = req->file_ctx->user_req; if (req->is_valid) { + _copy_token_bucket->consumeWithBorrowAndWait(req->size); + copy_request copy_req; copy_req.source = ureq->file_size_req.source; copy_req.file_name = req->file_ctx->file_name; @@ -502,5 +519,43 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code // notify aio_task req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0); } + +void nfs_client_impl::register_cli_commands() +{ + dsn::command_manager::instance().register_app_command( + {"nfs.max_copy_rate_megabytes"}, + "nfs.max_copy_rate_megabytes [num | DEFAULT]", + "control the max rate(MB/s) to copy file from remote node", + [this](const std::vector &args) { + std::string result("OK"); + + if (args.empty()) { + return std::to_string(_opts.max_copy_rate_megabytes); + } + + if (args[0] == "DEFAULT") { + uint32_t max_copy_rate_bytes = FLAGS_max_copy_rate_megabytes << 20; + _copy_token_bucket->reset(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes); + _opts.max_copy_rate_megabytes = FLAGS_max_copy_rate_megabytes; + return result; + } + + int32_t max_copy_rate_megabytes = 0; + if (!dsn::buf2int32(args[0], max_copy_rate_megabytes) || max_copy_rate_megabytes <= 0) { + return std::string("ERR: invalid arguments"); + } + + uint32_t max_copy_rate_bytes = max_copy_rate_megabytes << 20; + if (max_copy_rate_bytes <= _opts.nfs_copy_block_bytes) { + result = std::string("ERR: max_copy_rate_bytes(max_copy_rate_megabytes << 20) " + "should be greater than nfs_copy_block_bytes:") + .append(std::to_string(_opts.nfs_copy_block_bytes)); + return result; + } + _copy_token_bucket->reset(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes); + _opts.max_copy_rate_megabytes = max_copy_rate_megabytes; + return result; + }); +} } } diff --git a/src/dist/nfs/nfs_client_impl.h b/src/dist/nfs/nfs_client_impl.h index 852a111a4c..77d568ddf2 100644 --- a/src/dist/nfs/nfs_client_impl.h +++ b/src/dist/nfs/nfs_client_impl.h @@ -39,15 +39,20 @@ #include #include #include - +#include +#include +#include #include "nfs_client.h" namespace dsn { namespace service { +using TokenBucket = folly::BasicTokenBucket; + struct nfs_opts { uint32_t nfs_copy_block_bytes; + uint32_t max_copy_rate_megabytes; int max_concurrent_remote_copy_requests; int max_concurrent_local_writes; int max_buffered_local_writes; @@ -288,7 +293,7 @@ class nfs_client_impl : public ::dsn::service::nfs_client public: nfs_client_impl(nfs_opts &opts); - virtual ~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); } + virtual ~nfs_client_impl(); // copy file request entry void begin_remote_copy(std::shared_ptr &rci, aio_task *nfs_task); @@ -309,9 +314,13 @@ class nfs_client_impl : public ::dsn::service::nfs_client void handle_completion(const user_request_ptr &req, error_code err); + void register_cli_commands(); + private: nfs_opts &_opts; + std::unique_ptr _copy_token_bucket; // rate limiter of copy from remote + std::atomic _concurrent_copy_request_count; // record concurrent request count, limited // by max_concurrent_remote_copy_requests. std::atomic _concurrent_local_write_count; // record concurrent write count, limited