From 00a48aa393ddeeda6541072e7273a9dfbeeed5b9 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Tue, 28 Apr 2020 16:44:38 +0800 Subject: [PATCH 1/4] fix double free --- src/dist/nfs/nfs_client_impl.cpp | 55 ++++++++++++++++++++++++++++++++ src/dist/nfs/nfs_client_impl.h | 13 ++++++-- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/src/dist/nfs/nfs_client_impl.cpp b/src/dist/nfs/nfs_client_impl.cpp index 1a35847ee0..88f0dcdfa7 100644 --- a/src/dist/nfs/nfs_client_impl.cpp +++ b/src/dist/nfs/nfs_client_impl.cpp @@ -34,11 +34,14 @@ */ #include #include +#include #include "nfs_client_impl.h" namespace dsn { namespace service { +DSN_DEFINE_int32("nfs", max_copy_megabytes_rate, 500, "max rate of copying from remote node(MB/s)"); + nfs_client_impl::nfs_client_impl(nfs_opts &opts) : _opts(opts), _concurrent_copy_request_count(0), @@ -65,8 +68,20 @@ nfs_client_impl::nfs_client_impl(nfs_opts &opts) "recent_write_fail_count", COUNTER_TYPE_VOLATILE_NUMBER, "nfs client write fail count count in the recent period"); + + uint32_t max_copy_bytes_rate = FLAGS_max_copy_megabytes_rate << 20; + // max_copy_bytes_rate should be greater than nfs_copy_block_bytes which is the max batch copy + // size once + dassert(max_copy_bytes_rate > _opts.nfs_copy_block_bytes, + "max_copy_bytes_rate should be greater than nfs_copy_block_bytes"); + _copy_token_bucket.reset(new TokenBucket(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate)); + _opts.max_copy_megabytes_rate = FLAGS_max_copy_megabytes_rate; + + register_cli_commands(); } +nfs_client_impl::~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); } + void nfs_client_impl::begin_remote_copy(std::shared_ptr &rci, aio_task *nfs_task) { @@ -217,6 +232,8 @@ void nfs_client_impl::continue_copy() zauto_lock l(req->lock); const user_request_ptr &ureq = req->file_ctx->user_req; if (req->is_valid) { + _copy_token_bucket->consumeWithBorrowAndWait(req->size); + copy_request copy_req; copy_req.source = ureq->file_size_req.source; copy_req.file_name = req->file_ctx->file_name; @@ -502,5 +519,43 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code // notify aio_task req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0); } + +void nfs_client_impl::register_cli_commands() +{ + dsn::command_manager::instance().register_app_command( + {"nfs.max_copy_megabytes_rate"}, + "nfs.max_copy_megabytes_rate [num | DEFAULT]", + "control the max rate(MB/s) to copy file from remote node", + [this](const std::vector &args) { + std::string result("OK"); + + if (args.empty()) { + return std::to_string(_opts.max_copy_megabytes_rate); + } + + if (args[0] == "DEFAULT") { + uint32_t max_copy_bytes_rate = FLAGS_max_copy_megabytes_rate << 20; + _copy_token_bucket->reset(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate); + _opts.max_copy_megabytes_rate = FLAGS_max_copy_megabytes_rate; + return result; + } + + int32_t max_copy_megabytes_rate = 0; + if (!dsn::buf2int32(args[0], max_copy_megabytes_rate) || max_copy_megabytes_rate <= 0) { + return std::string("ERR: invalid arguments"); + } + + uint32_t max_copy_bytes_rate = max_copy_megabytes_rate << 20; + if (max_copy_bytes_rate <= _opts.nfs_copy_block_bytes) { + result = std::string("ERR: max_copy_bytes_rate(max_copy_megabytes_rate << 20) " + "should be greater than nfs_copy_block_bytes:") + .append(std::to_string(_opts.nfs_copy_block_bytes)); + return result; + } + _copy_token_bucket->reset(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate); + _opts.max_copy_megabytes_rate = max_copy_megabytes_rate; + return result; + }); +} } } diff --git a/src/dist/nfs/nfs_client_impl.h b/src/dist/nfs/nfs_client_impl.h index 852a111a4c..d7c796b489 100644 --- a/src/dist/nfs/nfs_client_impl.h +++ b/src/dist/nfs/nfs_client_impl.h @@ -39,15 +39,20 @@ #include #include #include - +#include +#include +#include #include "nfs_client.h" namespace dsn { namespace service { +using TokenBucket = folly::BasicTokenBucket; + struct nfs_opts { uint32_t nfs_copy_block_bytes; + uint32_t max_copy_megabytes_rate; int max_concurrent_remote_copy_requests; int max_concurrent_local_writes; int max_buffered_local_writes; @@ -288,7 +293,7 @@ class nfs_client_impl : public ::dsn::service::nfs_client public: nfs_client_impl(nfs_opts &opts); - virtual ~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); } + virtual ~nfs_client_impl(); // copy file request entry void begin_remote_copy(std::shared_ptr &rci, aio_task *nfs_task); @@ -309,9 +314,13 @@ class nfs_client_impl : public ::dsn::service::nfs_client void handle_completion(const user_request_ptr &req, error_code err); + void register_cli_commands(); + private: nfs_opts &_opts; + std::unique_ptr _copy_token_bucket; // rate limiter of copy from remote + std::atomic _concurrent_copy_request_count; // record concurrent request count, limited // by max_concurrent_remote_copy_requests. std::atomic _concurrent_local_write_count; // record concurrent write count, limited From f3347836a9e52e8c0d7a4475ca6d1c1156d86a98 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 15 May 2020 19:28:34 +0800 Subject: [PATCH 2/4] update fds limit config --- src/dist/block_service/fds/fds_service.cpp | 23 +++++++++------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 6c4dfd28ea..0c58baffdc 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -103,8 +103,6 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5"; fds_service::fds_service() { - const int BYTE_TO_BIT = 8; - /// In normal scenario, the sst file size of level 0 is write_buffer_size * [0.75, 1.25] /// And in BULK_LOAD scenario, it is 4 * write_buffer_size * [0.75, 1.25]. /// In rdsn, we can't get the scenario, so if we take BULK_LOAD scenario into consideration, @@ -113,30 +111,27 @@ fds_service::fds_service() uint64_t target_file_size = dsn_config_get_value_uint64("pegasus.server", "rocksdb_target_file_size_base", - 64 * 1024 * 1024, + 64 << 20, "rocksdb options.target_file_size_base"); uint64_t write_buffer_size = dsn_config_get_value_uint64("pegasus.server", "rocksdb_write_buffer_size", - 64 * 1024 * 1024, + 64 << 20, "rocksdb options.write_buffer_size"); uint64_t max_sst_file_size = std::max(target_file_size, (uint64_t)1.25 * write_buffer_size); uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64( - "replication", "fds_write_limit_rate", 20, "rate limit of fds(Mb/s)"); + "replication", "fds_write_limit_rate", 50, "rate limit of fds(MB/s)"); /// For write operation, we can't send a file in batches. Because putContent interface of fds /// will overwrite what was sent before for the same file. So we must send a file as a whole. /// If file size > burst size, the file will be rejected by the token bucket. /// Here we set burst_size = max_sst_file_size + 3MB, a litte greater than max_sst_file_size - uint32_t burst_size = - std::max(2 * write_rate_limit * 1e6 / BYTE_TO_BIT, max_sst_file_size + 3e6); - _write_token_bucket.reset( - new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); + uint32_t burst_size = std::max(2.0 * (write_rate_limit << 20), max_sst_file_size + 3e6); + _write_token_bucket.reset(new folly::TokenBucket(write_rate_limit << 20, burst_size)); uint32_t read_rate_limit = (uint32_t)dsn_config_get_value_uint64( - "replication", "fds_read_limit_rate", 20, "rate limit of fds(Mb/s)"); - burst_size = 2 * read_rate_limit * 1e6 / BYTE_TO_BIT; - _read_token_bucket.reset( - new folly::TokenBucket(read_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); + "replication", "fds_read_limit_rate", 100, "rate limit of fds(MB/s)"); + burst_size = 2 * read_rate_limit << 20; + _read_token_bucket.reset(new folly::TokenBucket(read_rate_limit << 20, burst_size)); } fds_service::~fds_service() {} @@ -569,7 +564,7 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, /*out*/ uint64_t &transfered_bytes) { // the max batch size is 1MB - const uint64_t BATCH_MAX = 1e6; + const uint64_t BATCH_MAX = 1 << 20; error_code err = ERR_OK; transfered_bytes = 0; From 7bf9f0f1658ea582e74842d9bf383f5ce8e02c59 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 15 May 2020 19:29:47 +0800 Subject: [PATCH 3/4] update fds limit config --- src/dist/block_service/fds/fds_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 0c58baffdc..a70b73dcec 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -120,7 +120,7 @@ fds_service::fds_service() uint64_t max_sst_file_size = std::max(target_file_size, (uint64_t)1.25 * write_buffer_size); uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64( - "replication", "fds_write_limit_rate", 50, "rate limit of fds(MB/s)"); + "replication", "fds_write_limit_rate", 100, "rate limit of fds(MB/s)"); /// For write operation, we can't send a file in batches. Because putContent interface of fds /// will overwrite what was sent before for the same file. So we must send a file as a whole. /// If file size > burst size, the file will be rejected by the token bucket. From a6a4356dbd84a99b4b9a7e990ec5832781737821 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Mon, 18 May 2020 10:07:16 +0800 Subject: [PATCH 4/4] rename --- src/dist/nfs/nfs_client_impl.cpp | 40 ++++++++++++++++---------------- src/dist/nfs/nfs_client_impl.h | 2 +- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/dist/nfs/nfs_client_impl.cpp b/src/dist/nfs/nfs_client_impl.cpp index 88f0dcdfa7..5e9ff15457 100644 --- a/src/dist/nfs/nfs_client_impl.cpp +++ b/src/dist/nfs/nfs_client_impl.cpp @@ -40,7 +40,7 @@ namespace dsn { namespace service { -DSN_DEFINE_int32("nfs", max_copy_megabytes_rate, 500, "max rate of copying from remote node(MB/s)"); +DSN_DEFINE_int32("nfs", max_copy_rate_megabytes, 500, "max rate of copying from remote node(MB/s)"); nfs_client_impl::nfs_client_impl(nfs_opts &opts) : _opts(opts), @@ -69,13 +69,13 @@ nfs_client_impl::nfs_client_impl(nfs_opts &opts) COUNTER_TYPE_VOLATILE_NUMBER, "nfs client write fail count count in the recent period"); - uint32_t max_copy_bytes_rate = FLAGS_max_copy_megabytes_rate << 20; - // max_copy_bytes_rate should be greater than nfs_copy_block_bytes which is the max batch copy + uint32_t max_copy_rate_bytes = FLAGS_max_copy_rate_megabytes << 20; + // max_copy_rate_bytes should be greater than nfs_copy_block_bytes which is the max batch copy // size once - dassert(max_copy_bytes_rate > _opts.nfs_copy_block_bytes, - "max_copy_bytes_rate should be greater than nfs_copy_block_bytes"); - _copy_token_bucket.reset(new TokenBucket(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate)); - _opts.max_copy_megabytes_rate = FLAGS_max_copy_megabytes_rate; + dassert(max_copy_rate_bytes > _opts.nfs_copy_block_bytes, + "max_copy_rate_bytes should be greater than nfs_copy_block_bytes"); + _copy_token_bucket.reset(new TokenBucket(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes)); + _opts.max_copy_rate_megabytes = FLAGS_max_copy_rate_megabytes; register_cli_commands(); } @@ -523,37 +523,37 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code void nfs_client_impl::register_cli_commands() { dsn::command_manager::instance().register_app_command( - {"nfs.max_copy_megabytes_rate"}, - "nfs.max_copy_megabytes_rate [num | DEFAULT]", + {"nfs.max_copy_rate_megabytes"}, + "nfs.max_copy_rate_megabytes [num | DEFAULT]", "control the max rate(MB/s) to copy file from remote node", [this](const std::vector &args) { std::string result("OK"); if (args.empty()) { - return std::to_string(_opts.max_copy_megabytes_rate); + return std::to_string(_opts.max_copy_rate_megabytes); } if (args[0] == "DEFAULT") { - uint32_t max_copy_bytes_rate = FLAGS_max_copy_megabytes_rate << 20; - _copy_token_bucket->reset(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate); - _opts.max_copy_megabytes_rate = FLAGS_max_copy_megabytes_rate; + uint32_t max_copy_rate_bytes = FLAGS_max_copy_rate_megabytes << 20; + _copy_token_bucket->reset(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes); + _opts.max_copy_rate_megabytes = FLAGS_max_copy_rate_megabytes; return result; } - int32_t max_copy_megabytes_rate = 0; - if (!dsn::buf2int32(args[0], max_copy_megabytes_rate) || max_copy_megabytes_rate <= 0) { + int32_t max_copy_rate_megabytes = 0; + if (!dsn::buf2int32(args[0], max_copy_rate_megabytes) || max_copy_rate_megabytes <= 0) { return std::string("ERR: invalid arguments"); } - uint32_t max_copy_bytes_rate = max_copy_megabytes_rate << 20; - if (max_copy_bytes_rate <= _opts.nfs_copy_block_bytes) { - result = std::string("ERR: max_copy_bytes_rate(max_copy_megabytes_rate << 20) " + uint32_t max_copy_rate_bytes = max_copy_rate_megabytes << 20; + if (max_copy_rate_bytes <= _opts.nfs_copy_block_bytes) { + result = std::string("ERR: max_copy_rate_bytes(max_copy_rate_megabytes << 20) " "should be greater than nfs_copy_block_bytes:") .append(std::to_string(_opts.nfs_copy_block_bytes)); return result; } - _copy_token_bucket->reset(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate); - _opts.max_copy_megabytes_rate = max_copy_megabytes_rate; + _copy_token_bucket->reset(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes); + _opts.max_copy_rate_megabytes = max_copy_rate_megabytes; return result; }); } diff --git a/src/dist/nfs/nfs_client_impl.h b/src/dist/nfs/nfs_client_impl.h index d7c796b489..77d568ddf2 100644 --- a/src/dist/nfs/nfs_client_impl.h +++ b/src/dist/nfs/nfs_client_impl.h @@ -52,7 +52,7 @@ using TokenBucket = folly::BasicTokenBucket; struct nfs_opts { uint32_t nfs_copy_block_bytes; - uint32_t max_copy_megabytes_rate; + uint32_t max_copy_rate_megabytes; int max_concurrent_remote_copy_requests; int max_concurrent_local_writes; int max_buffered_local_writes;