Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add rate limit for learning and support remote-command #461

Merged
merged 8 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions src/dist/block_service/fds/fds_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5";

fds_service::fds_service()
{
const int BYTE_TO_BIT = 8;

/// In normal scenario, the sst file size of level 0 is write_buffer_size * [0.75, 1.25]
/// And in BULK_LOAD scenario, it is 4 * write_buffer_size * [0.75, 1.25].
/// In rdsn, we can't get the scenario, so if we take BULK_LOAD scenario into consideration,
Expand All @@ -113,30 +111,27 @@ fds_service::fds_service()
uint64_t target_file_size =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_target_file_size_base",
64 * 1024 * 1024,
64 << 20,
"rocksdb options.target_file_size_base");
uint64_t write_buffer_size = dsn_config_get_value_uint64("pegasus.server",
"rocksdb_write_buffer_size",
64 * 1024 * 1024,
64 << 20,
"rocksdb options.write_buffer_size");
uint64_t max_sst_file_size = std::max(target_file_size, (uint64_t)1.25 * write_buffer_size);

uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64(
"replication", "fds_write_limit_rate", 20, "rate limit of fds(Mb/s)");
"replication", "fds_write_limit_rate", 100, "rate limit of fds(MB/s)");
/// For write operation, we can't send a file in batches. Because putContent interface of fds
/// will overwrite what was sent before for the same file. So we must send a file as a whole.
/// If file size > burst size, the file will be rejected by the token bucket.
/// Here we set burst_size = max_sst_file_size + 3MB, a litte greater than max_sst_file_size
uint32_t burst_size =
std::max(2 * write_rate_limit * 1e6 / BYTE_TO_BIT, max_sst_file_size + 3e6);
_write_token_bucket.reset(
new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size));
uint32_t burst_size = std::max(2.0 * (write_rate_limit << 20), max_sst_file_size + 3e6);
_write_token_bucket.reset(new folly::TokenBucket(write_rate_limit << 20, burst_size));

uint32_t read_rate_limit = (uint32_t)dsn_config_get_value_uint64(
"replication", "fds_read_limit_rate", 20, "rate limit of fds(Mb/s)");
burst_size = 2 * read_rate_limit * 1e6 / BYTE_TO_BIT;
_read_token_bucket.reset(
new folly::TokenBucket(read_rate_limit * 1e6 / BYTE_TO_BIT, burst_size));
"replication", "fds_read_limit_rate", 100, "rate limit of fds(MB/s)");
burst_size = 2 * read_rate_limit << 20;
_read_token_bucket.reset(new folly::TokenBucket(read_rate_limit << 20, burst_size));
}

fds_service::~fds_service() {}
Expand Down Expand Up @@ -569,7 +564,7 @@ error_code fds_file_object::get_content_in_batches(uint64_t start,
/*out*/ uint64_t &transfered_bytes)
{
// the max batch size is 1MB
const uint64_t BATCH_MAX = 1e6;
const uint64_t BATCH_MAX = 1 << 20;
error_code err = ERR_OK;
transfered_bytes = 0;

Expand Down
55 changes: 55 additions & 0 deletions src/dist/nfs/nfs_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
*/
#include <dsn/utility/filesystem.h>
#include <queue>
#include <dsn/tool-api/command_manager.h>
#include "nfs_client_impl.h"

namespace dsn {
namespace service {

DSN_DEFINE_int32("nfs", max_copy_rate_megabytes, 500, "max rate of copying from remote node(MB/s)");
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

nfs_client_impl::nfs_client_impl(nfs_opts &opts)
: _opts(opts),
_concurrent_copy_request_count(0),
Expand All @@ -65,8 +68,20 @@ nfs_client_impl::nfs_client_impl(nfs_opts &opts)
"recent_write_fail_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"nfs client write fail count count in the recent period");

uint32_t max_copy_rate_bytes = FLAGS_max_copy_rate_megabytes << 20;
// max_copy_rate_bytes should be greater than nfs_copy_block_bytes which is the max batch copy
// size once
dassert(max_copy_rate_bytes > _opts.nfs_copy_block_bytes,
"max_copy_rate_bytes should be greater than nfs_copy_block_bytes");
_copy_token_bucket.reset(new TokenBucket(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes));
_opts.max_copy_rate_megabytes = FLAGS_max_copy_rate_megabytes;

register_cli_commands();
}

nfs_client_impl::~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); }
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

void nfs_client_impl::begin_remote_copy(std::shared_ptr<remote_copy_request> &rci,
aio_task *nfs_task)
{
Expand Down Expand Up @@ -217,6 +232,8 @@ void nfs_client_impl::continue_copy()
zauto_lock l(req->lock);
const user_request_ptr &ureq = req->file_ctx->user_req;
if (req->is_valid) {
_copy_token_bucket->consumeWithBorrowAndWait(req->size);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

copy_request copy_req;
copy_req.source = ureq->file_size_req.source;
copy_req.file_name = req->file_ctx->file_name;
Expand Down Expand Up @@ -502,5 +519,43 @@ void nfs_client_impl::handle_completion(const user_request_ptr &req, error_code
// notify aio_task
req->nfs_task->enqueue(err, err == ERR_OK ? total_size : 0);
}

void nfs_client_impl::register_cli_commands()
{
dsn::command_manager::instance().register_app_command(
{"nfs.max_copy_rate_megabytes"},
"nfs.max_copy_rate_megabytes [num | DEFAULT]",
"control the max rate(MB/s) to copy file from remote node",
[this](const std::vector<std::string> &args) {
std::string result("OK");

if (args.empty()) {
return std::to_string(_opts.max_copy_rate_megabytes);
}

if (args[0] == "DEFAULT") {
uint32_t max_copy_rate_bytes = FLAGS_max_copy_rate_megabytes << 20;
_copy_token_bucket->reset(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes);
_opts.max_copy_rate_megabytes = FLAGS_max_copy_rate_megabytes;
return result;
}

int32_t max_copy_rate_megabytes = 0;
if (!dsn::buf2int32(args[0], max_copy_rate_megabytes) || max_copy_rate_megabytes <= 0) {
return std::string("ERR: invalid arguments");
}

uint32_t max_copy_rate_bytes = max_copy_rate_megabytes << 20;
if (max_copy_rate_bytes <= _opts.nfs_copy_block_bytes) {
result = std::string("ERR: max_copy_rate_bytes(max_copy_rate_megabytes << 20) "
"should be greater than nfs_copy_block_bytes:")
.append(std::to_string(_opts.nfs_copy_block_bytes));
return result;
}
_copy_token_bucket->reset(max_copy_rate_bytes, 1.5 * max_copy_rate_bytes);
_opts.max_copy_rate_megabytes = max_copy_rate_megabytes;
return result;
});
}
}
}
13 changes: 11 additions & 2 deletions src/dist/nfs/nfs_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,20 @@
#include <dsn/tool-api/zlocks.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/dist/nfs_node.h>

#include <dsn/utility/defer.h>
#include <dsn/utility/TokenBucket.h>
#include <dsn/utility/flags.h>
#include "nfs_client.h"

namespace dsn {
namespace service {

using TokenBucket = folly::BasicTokenBucket<std::chrono::steady_clock>;

struct nfs_opts
{
uint32_t nfs_copy_block_bytes;
uint32_t max_copy_rate_megabytes;
int max_concurrent_remote_copy_requests;
int max_concurrent_local_writes;
int max_buffered_local_writes;
Expand Down Expand Up @@ -288,7 +293,7 @@ class nfs_client_impl : public ::dsn::service::nfs_client

public:
nfs_client_impl(nfs_opts &opts);
virtual ~nfs_client_impl() { _tracker.cancel_outstanding_tasks(); }
virtual ~nfs_client_impl();

// copy file request entry
void begin_remote_copy(std::shared_ptr<remote_copy_request> &rci, aio_task *nfs_task);
Expand All @@ -309,9 +314,13 @@ class nfs_client_impl : public ::dsn::service::nfs_client

void handle_completion(const user_request_ptr &req, error_code err);

void register_cli_commands();

private:
nfs_opts &_opts;

std::unique_ptr<folly::TokenBucket> _copy_token_bucket; // rate limiter of copy from remote

std::atomic<int> _concurrent_copy_request_count; // record concurrent request count, limited
// by max_concurrent_remote_copy_requests.
std::atomic<int> _concurrent_local_write_count; // record concurrent write count, limited
Expand Down