Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer committed May 15, 2020
1 parent 704e1f0 commit 7dd15ab
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
60 changes: 60 additions & 0 deletions src/dist/nfs/nfs_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
*/
#include <dsn/utility/filesystem.h>
#include <queue>
#include <dsn/tool-api/command_manager.h>
#include "nfs_client_impl.h"

namespace dsn {
namespace service {

DSN_DEFINE_int32("nfs", max_copy_megabytes_rate, 500, "max rate of copying from remote node(MB/s)");

nfs_client_impl::nfs_client_impl(nfs_opts &opts)
: _opts(opts),
_concurrent_copy_request_count(0),
Expand All @@ -65,6 +68,23 @@ nfs_client_impl::nfs_client_impl(nfs_opts &opts)
"recent_write_fail_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"nfs client write fail count count in the recent period");

uint32_t burst_size = 1.5 * (FLAGS_max_copy_megabytes_rate << 20);
// burst_size should be greater than nfs_copy_block_bytes which is the max batch copy size once
dassert(burst_size > _opts.nfs_copy_block_bytes,
"burst_size = 1.5 * (max_copy_megabytes_rate "
"<< 20) should be greater than "
"nfs_copy_block_bytes");
_copy_token_bucket.reset(new TokenBucket(FLAGS_max_copy_megabytes_rate << 20, burst_size));
_opts.max_copy_megabytes_rate = FLAGS_max_copy_megabytes_rate;

register_cli_commands();
}

nfs_client_impl::~nfs_client_impl()
{
_tracker.cancel_outstanding_tasks();
dsn::command_manager::instance().deregister_command(_ctrl_nfs_max_copy_megabytes_rate);
}

void nfs_client_impl::begin_remote_copy(std::shared_ptr<remote_copy_request> &rci,
Expand Down Expand Up @@ -217,6 +237,8 @@ void nfs_client_impl::continue_copy()
zauto_lock l(req->lock);
const user_request_ptr &ureq = req->file_ctx->user_req;
if (req->is_valid) {
_copy_token_bucket->consumeWithBorrowAndWait(req->size);

copy_request copy_req;
copy_req.source = ureq->file_size_req.source;
copy_req.file_name = req->file_ctx->file_name;
Expand Down Expand Up @@ -502,5 +524,43 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code
// notify aio_task
req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0);
}

void nfs_client_impl::register_cli_commands()
{
_ctrl_nfs_max_copy_megabytes_rate = dsn::command_manager::instance().register_app_command(
{"nfs.max_copy_megabytes_rate"},
"nfs.max_copy_megabytes_rate [num | DEFAULT]",
"control the max rate(MB/s) to copy file from remote node",
[this](const std::vector<std::string> &args) {
std::string result("OK");

if (args.empty()) {
return std::to_string(_opts.max_copy_megabytes_rate);
}

if (args[0] == "DEFAULT") {
uint32_t burst_size = 1.5 * (_opts.max_copy_megabytes_rate << 20);
_copy_token_bucket->reset(_opts.max_copy_megabytes_rate << 20, burst_size);
return result;
}

int32_t max_copy_megabytes_rate = 0;
if (!dsn::buf2int32(args[0], max_copy_megabytes_rate) || max_copy_megabytes_rate <= 0) {
return std::string("ERR: invalid arguments");
}

uint32_t burst_size = 1.5 * (max_copy_megabytes_rate << 20);
if (burst_size < _opts.nfs_copy_block_bytes) {
result = std::string("burst_size = 1.5 * (max_copy_megabytes_rate << 20) should be "
"greater than nfs_copy_block_bytes:")
.append(std::to_string(_opts.nfs_copy_block_bytes));
return result;
}
_copy_token_bucket->reset(_opts.max_copy_megabytes_rate << 20, burst_size);
_opts.max_copy_megabytes_rate = max_copy_megabytes_rate;
return result;
});
dassert(_ctrl_nfs_max_copy_megabytes_rate, "register cli handler failed");
}
}
}
15 changes: 13 additions & 2 deletions src/dist/nfs/nfs_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,20 @@
#include <dsn/tool-api/zlocks.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/dist/nfs_node.h>

#include <dsn/utility/defer.h>
#include <dsn/utility/TokenBucket.h>
#include <dsn/utility/flags.h>
#include "nfs_client.h"

namespace dsn {
namespace service {

using TokenBucket = folly::BasicTokenBucket<std::chrono::steady_clock>;

struct nfs_opts
{
uint32_t nfs_copy_block_bytes;
uint32_t max_copy_megabytes_rate;
int max_concurrent_remote_copy_requests;
int max_concurrent_local_writes;
int max_buffered_local_writes;
Expand Down Expand Up @@ -288,7 +293,7 @@ class nfs_client_impl : public ::dsn::service::nfs_client

public:
nfs_client_impl(nfs_opts &opts);
virtual ~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); }
virtual ~nfs_client_impl();

// copy file request entry
void begin_remote_copy(std::shared_ptr<remote_copy_request> &rci, aio_task *nfs_task);
Expand All @@ -309,9 +314,15 @@ class nfs_client_impl : public ::dsn::service::nfs_client

void handle_completion(const user_request_ptr &req, error_code err);

void register_cli_commands();

private:
nfs_opts &_opts;

std::unique_ptr<folly::TokenBucket> _copy_token_bucket; // rate limiter of copy from remote
dsn_handle_t
_ctrl_nfs_max_copy_megabytes_rate; // use remote-command update the max_copy_megabytes_rate

std::atomic<int> _concurrent_copy_request_count; // record concurrent request count, limited
// by max_concurrent_remote_copy_requests.
std::atomic<int> _concurrent_local_write_count; // record concurrent write count, limited
Expand Down

0 comments on commit 7dd15ab

Please sign in to comment.