Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat: add rate limit for learning and support remote-command (#461)
Browse files Browse the repository at this point in the history
foreverneverer authored May 19, 2020
1 parent 466bca2 commit c8e1a80
Showing 3 changed files with 75 additions and 16 deletions.
23 changes: 9 additions & 14 deletions src/dist/block_service/fds/fds_service.cpp
Original file line number Diff line number Diff line change
@@ -103,8 +103,6 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5";

fds_service::fds_service()
{
const int BYTE_TO_BIT = 8;

/// In normal scenario, the sst file size of level 0 is write_buffer_size * [0.75, 1.25]
/// And in BULK_LOAD scenario, it is 4 * write_buffer_size * [0.75, 1.25].
/// In rdsn, we can't get the scenario, so if we take BULK_LOAD scenario into consideration,
@@ -113,30 +111,27 @@ fds_service::fds_service()
uint64_t target_file_size =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_target_file_size_base",
64 * 1024 * 1024,
64 << 20,
"rocksdb options.target_file_size_base");
uint64_t write_buffer_size = dsn_config_get_value_uint64("pegasus.server",
"rocksdb_write_buffer_size",
64 * 1024 * 1024,
64 << 20,
"rocksdb options.write_buffer_size");
uint64_t max_sst_file_size = std::max(target_file_size, (uint64_t)1.25 * write_buffer_size);

uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64(
"replication", "fds_write_limit_rate", 20, "rate limit of fds(Mb/s)");
"replication", "fds_write_limit_rate", 100, "rate limit of fds(MB/s)");
/// For write operation, we can't send a file in batches. Because putContent interface of fds
/// will overwrite what was sent before for the same file. So we must send a file as a whole.
/// If file size > burst size, the file will be rejected by the token bucket.
/// Here we set burst_size = max_sst_file_size + 3MB, a litte greater than max_sst_file_size
uint32_t burst_size =
std::max(2 * write_rate_limit * 1e6 / BYTE_TO_BIT, max_sst_file_size + 3e6);
_write_token_bucket.reset(
new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size));
uint32_t burst_size = std::max(2.0 * (write_rate_limit << 20), max_sst_file_size + 3e6);
_write_token_bucket.reset(new folly::TokenBucket(write_rate_limit << 20, burst_size));

uint32_t read_rate_limit = (uint32_t)dsn_config_get_value_uint64(
"replication", "fds_read_limit_rate", 20, "rate limit of fds(Mb/s)");
burst_size = 2 * read_rate_limit * 1e6 / BYTE_TO_BIT;
_read_token_bucket.reset(
new folly::TokenBucket(read_rate_limit * 1e6 / BYTE_TO_BIT, burst_size));
"replication", "fds_read_limit_rate", 100, "rate limit of fds(MB/s)");
burst_size = 2 * read_rate_limit << 20;
_read_token_bucket.reset(new folly::TokenBucket(read_rate_limit << 20, burst_size));
}

fds_service::~fds_service() {}
@@ -569,7 +564,7 @@ error_code fds_file_object::get_content_in_batches(uint64_t start,
/*out*/ uint64_t &transfered_bytes)
{
// the max batch size is 1MB
const uint64_t BATCH_MAX = 1e6;
const uint64_t BATCH_MAX = 1 << 20;
error_code err = ERR_OK;
transfered_bytes = 0;

55 changes: 55 additions & 0 deletions src/dist/nfs/nfs_client_impl.cpp
Original file line number Diff line number Diff line change
@@ -34,11 +34,14 @@
*/
#include <dsn/utility/filesystem.h>
#include <queue>
#include <dsn/tool-api/command_manager.h>
#include "nfs_client_impl.h"

namespace dsn {
namespace service {

DSN_DEFINE_int32("nfs", max_copy_rate_megabytes, 500, "max rate of copying from remote node(MB/s)");

nfs_client_impl::nfs_client_impl(nfs_opts &opts)
: _opts(opts),
_concurrent_copy_request_count(0),
@@ -65,8 +68,20 @@ nfs_client_impl::nfs_client_impl(nfs_opts &opts)
"recent_write_fail_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"nfs client write fail count count in the recent period");

uint32_t max_copy_rate_bytes = FLAGS_max_copy_rate_megabytes << 20;
// max_copy_rate_bytes should be greater than nfs_copy_block_bytes which is the max batch copy
// size once
dassert(max_copy_rate_bytes > _opts.nfs_copy_block_bytes,
"max_copy_rate_bytes should be greater than nfs_copy_block_bytes");
_copy_token_bucket.reset(new TokenBucket(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes));
_opts.max_copy_rate_megabytes = FLAGS_max_copy_rate_megabytes;

register_cli_commands();
}

nfs_client_impl::~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); }

void nfs_client_impl::begin_remote_copy(std::shared_ptr<remote_copy_request> &rci,
aio_task *nfs_task)
{
@@ -217,6 +232,8 @@ void nfs_client_impl::continue_copy()
zauto_lock l(req->lock);
const user_request_ptr &ureq = req->file_ctx->user_req;
if (req->is_valid) {
_copy_token_bucket->consumeWithBorrowAndWait(req->size);

copy_request copy_req;
copy_req.source = ureq->file_size_req.source;
copy_req.file_name = req->file_ctx->file_name;
@@ -502,5 +519,43 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code
// notify aio_task
req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0);
}

void nfs_client_impl::register_cli_commands()
{
dsn::command_manager::instance().register_app_command(
{"nfs.max_copy_rate_megabytes"},
"nfs.max_copy_rate_megabytes [num | DEFAULT]",
"control the max rate(MB/s) to copy file from remote node",
[this](const std::vector<std::string> &args) {
std::string result("OK");

if (args.empty()) {
return std::to_string(_opts.max_copy_rate_megabytes);
}

if (args[0] == "DEFAULT") {
uint32_t max_copy_rate_bytes = FLAGS_max_copy_rate_megabytes << 20;
_copy_token_bucket->reset(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes);
_opts.max_copy_rate_megabytes = FLAGS_max_copy_rate_megabytes;
return result;
}

int32_t max_copy_rate_megabytes = 0;
if (!dsn::buf2int32(args[0], max_copy_rate_megabytes) || max_copy_rate_megabytes <= 0) {
return std::string("ERR: invalid arguments");
}

uint32_t max_copy_rate_bytes = max_copy_rate_megabytes << 20;
if (max_copy_rate_bytes <= _opts.nfs_copy_block_bytes) {
result = std::string("ERR: max_copy_rate_bytes(max_copy_rate_megabytes << 20) "
"should be greater than nfs_copy_block_bytes:")
.append(std::to_string(_opts.nfs_copy_block_bytes));
return result;
}
_copy_token_bucket->reset(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes);
_opts.max_copy_rate_megabytes = max_copy_rate_megabytes;
return result;
});
}
}
}
13 changes: 11 additions & 2 deletions src/dist/nfs/nfs_client_impl.h
Original file line number Diff line number Diff line change
@@ -39,15 +39,20 @@
#include <dsn/tool-api/zlocks.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/dist/nfs_node.h>

#include <dsn/utility/defer.h>
#include <dsn/utility/TokenBucket.h>
#include <dsn/utility/flags.h>
#include "nfs_client.h"

namespace dsn {
namespace service {

using TokenBucket = folly::BasicTokenBucket<std::chrono::steady_clock>;

struct nfs_opts
{
uint32_t nfs_copy_block_bytes;
uint32_t max_copy_rate_megabytes;
int max_concurrent_remote_copy_requests;
int max_concurrent_local_writes;
int max_buffered_local_writes;
@@ -288,7 +293,7 @@ class nfs_client_impl : public ::dsn::service::nfs_client

public:
nfs_client_impl(nfs_opts &opts);
virtual ~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); }
virtual ~nfs_client_impl();

// copy file request entry
void begin_remote_copy(std::shared_ptr<remote_copy_request> &rci, aio_task *nfs_task);
@@ -309,9 +314,13 @@ class nfs_client_impl : public ::dsn::service::nfs_client

void handle_completion(const user_request_ptr &req, error_code err);

void register_cli_commands();

private:
nfs_opts &_opts;

std::unique_ptr<folly::TokenBucket> _copy_token_bucket; // rate limiter of copy from remote

std::atomic<int> _concurrent_copy_request_count; // record concurrent request count, limited
// by max_concurrent_remote_copy_requests.
std::atomic<int> _concurrent_local_write_count; // record concurrent write count, limited

0 comments on commit c8e1a80

Please sign in to comment.