Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add rate limit for learning and support remote-command #461

Merged
merged 8 commits into from
May 19, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix double free
foreverneverer committed May 15, 2020
commit 00a48aa393ddeeda6541072e7273a9dfbeeed5b9
55 changes: 55 additions & 0 deletions src/dist/nfs/nfs_client_impl.cpp
Original file line number Diff line number Diff line change
@@ -34,11 +34,14 @@
*/
#include <dsn/utility/filesystem.h>
#include <queue>
#include <dsn/tool-api/command_manager.h>
#include "nfs_client_impl.h"

namespace dsn {
namespace service {

DSN_DEFINE_int32("nfs", max_copy_megabytes_rate, 500, "max rate of copying from remote node(MB/s)");

nfs_client_impl::nfs_client_impl(nfs_opts &opts)
: _opts(opts),
_concurrent_copy_request_count(0),
@@ -65,8 +68,20 @@ nfs_client_impl::nfs_client_impl(nfs_opts &opts)
"recent_write_fail_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"nfs client write fail count count in the recent period");

uint32_t max_copy_bytes_rate = FLAGS_max_copy_megabytes_rate << 20;
// max_copy_bytes_rate should be greater than nfs_copy_block_bytes which is the max batch copy
// size once
dassert(max_copy_bytes_rate > _opts.nfs_copy_block_bytes,
"max_copy_bytes_rate should be greater than nfs_copy_block_bytes");
_copy_token_bucket.reset(new TokenBucket(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate));
_opts.max_copy_megabytes_rate = FLAGS_max_copy_megabytes_rate;

register_cli_commands();
}

nfs_client_impl::~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); }
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

void nfs_client_impl::begin_remote_copy(std::shared_ptr<remote_copy_request> &rci,
aio_task *nfs_task)
{
@@ -217,6 +232,8 @@ void nfs_client_impl::continue_copy()
zauto_lock l(req->lock);
const user_request_ptr &ureq = req->file_ctx->user_req;
if (req->is_valid) {
_copy_token_bucket->consumeWithBorrowAndWait(req->size);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

copy_request copy_req;
copy_req.source = ureq->file_size_req.source;
copy_req.file_name = req->file_ctx->file_name;
@@ -502,5 +519,43 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code
// notify aio_task
req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0);
}

void nfs_client_impl::register_cli_commands()
{
dsn::command_manager::instance().register_app_command(
{"nfs.max_copy_megabytes_rate"},
"nfs.max_copy_megabytes_rate [num | DEFAULT]",
"control the max rate(MB/s) to copy file from remote node",
[this](const std::vector<std::string> &args) {
std::string result("OK");

if (args.empty()) {
return std::to_string(_opts.max_copy_megabytes_rate);
}

if (args[0] == "DEFAULT") {
uint32_t max_copy_bytes_rate = FLAGS_max_copy_megabytes_rate << 20;
_copy_token_bucket->reset(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate);
_opts.max_copy_megabytes_rate = FLAGS_max_copy_megabytes_rate;
return result;
}

int32_t max_copy_megabytes_rate = 0;
if (!dsn::buf2int32(args[0], max_copy_megabytes_rate) || max_copy_megabytes_rate <= 0) {
return std::string("ERR: invalid arguments");
}

uint32_t max_copy_bytes_rate = max_copy_megabytes_rate << 20;
if (max_copy_bytes_rate <= _opts.nfs_copy_block_bytes) {
result = std::string("ERR: max_copy_bytes_rate(max_copy_megabytes_rate << 20) "
"should be greater than nfs_copy_block_bytes:")
.append(std::to_string(_opts.nfs_copy_block_bytes));
return result;
}
_copy_token_bucket->reset(max_copy_bytes_rate, 1.5 * max_copy_bytes_rate);
_opts.max_copy_megabytes_rate = max_copy_megabytes_rate;
return result;
});
}
}
}
13 changes: 11 additions & 2 deletions src/dist/nfs/nfs_client_impl.h
Original file line number Diff line number Diff line change
@@ -39,15 +39,20 @@
#include <dsn/tool-api/zlocks.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/dist/nfs_node.h>

#include <dsn/utility/defer.h>
#include <dsn/utility/TokenBucket.h>
#include <dsn/utility/flags.h>
#include "nfs_client.h"

namespace dsn {
namespace service {

using TokenBucket = folly::BasicTokenBucket<std::chrono::steady_clock>;

struct nfs_opts
{
uint32_t nfs_copy_block_bytes;
uint32_t max_copy_megabytes_rate;
int max_concurrent_remote_copy_requests;
int max_concurrent_local_writes;
int max_buffered_local_writes;
@@ -288,7 +293,7 @@ class nfs_client_impl : public ::dsn::service::nfs_client

public:
nfs_client_impl(nfs_opts &opts);
virtual ~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); }
virtual ~nfs_client_impl();

// copy file request entry
void begin_remote_copy(std::shared_ptr<remote_copy_request> &rci, aio_task *nfs_task);
@@ -309,9 +314,13 @@ class nfs_client_impl : public ::dsn::service::nfs_client

void handle_completion(const user_request_ptr &req, error_code err);

void register_cli_commands();

private:
nfs_opts &_opts;

std::unique_ptr<folly::TokenBucket> _copy_token_bucket; // rate limiter of copy from remote

std::atomic<int> _concurrent_copy_request_count; // record concurrent request count, limited
// by max_concurrent_remote_copy_requests.
std::atomic<int> _concurrent_local_write_count; // record concurrent write count, limited