Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat: make token bucket in fds configurable (#874)
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored and hycdong committed Sep 1, 2021
1 parent 9f4ab4c commit c576fe3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 30 deletions.
45 changes: 24 additions & 21 deletions src/block_service/fds/fds_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ namespace dist {
namespace block_service {

DSN_DEFINE_uint32("replication", fds_write_limit_rate, 100, "write rate limit of fds(MB/s)");
DSN_TAG_VARIABLE(fds_write_limit_rate, FT_MUTABLE);

DSN_DEFINE_uint32("replication", fds_write_burst_size, 500, "write burst size of fds(MB)");
DSN_TAG_VARIABLE(fds_write_burst_size, FT_MUTABLE);

DSN_DEFINE_uint32("replication", fds_read_limit_rate, 100, "read rate limit of fds(MB/s)");
DSN_TAG_VARIABLE(fds_read_limit_rate, FT_MUTABLE);

DSN_DEFINE_uint32("replication", fds_read_batch_size, 100, "read batch size of fds(MB)");
DSN_TAG_VARIABLE(fds_read_batch_size, FT_MUTABLE);

class utils
{
Expand Down Expand Up @@ -109,16 +115,8 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5";

fds_service::fds_service()
{
/// For write operation, we can't send a file in batches. Because putContent interface of fds
/// will overwrite what was sent before for the same file. So we must send a file as a whole.
/// If file size > burst size, the file will be rejected by the token bucket.
/// Here we set burst_size to max value of double, in order to make file not rejected by the
/// token bucket
_write_token_bucket.reset(new folly::TokenBucket(FLAGS_fds_write_limit_rate << 20,
std::numeric_limits<double>::max()));

uint32_t burst_size = 2 * FLAGS_fds_read_limit_rate << 20;
_read_token_bucket.reset(new folly::TokenBucket(FLAGS_fds_read_limit_rate << 20, burst_size));
_write_token_bucket.reset(new folly::DynamicTokenBucket());
_read_token_bucket.reset(new folly::DynamicTokenBucket());
}

fds_service::~fds_service() {}
Expand Down Expand Up @@ -427,8 +425,6 @@ error_code fds_file_object::get_content_in_batches(uint64_t start,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes)
{
const uint64_t BATCH_SIZE = std::min(FLAGS_fds_read_limit_rate, FLAGS_fds_read_batch_size)
<< 20;
error_code err = ERR_OK;
transfered_bytes = 0;

Expand All @@ -446,16 +442,20 @@ error_code fds_file_object::get_content_in_batches(uint64_t start,
uint64_t pos = start;
uint64_t once_transfered_bytes = 0;
while (pos < start + to_transfer_bytes) {
uint64_t batch_len = std::min(BATCH_SIZE, start + to_transfer_bytes - pos);
// get tokens from token bucket
_service->_read_token_bucket->consumeWithBorrowAndWait(batch_len);
const uint64_t BATCH_SIZE = FLAGS_fds_read_batch_size << 20;
uint64_t batch_size = std::min(BATCH_SIZE, start + to_transfer_bytes - pos);

// burst size should not be less than consume size
const uint64_t rate = FLAGS_fds_read_limit_rate << 20;
_service->_read_token_bucket->consumeWithBorrowAndWait(
batch_size, rate, std::max(2 * rate, batch_size));

err = get_content(pos, batch_len, os, once_transfered_bytes);
err = get_content(pos, batch_size, os, once_transfered_bytes);
transfered_bytes += once_transfered_bytes;
if (err != ERR_OK || once_transfered_bytes < batch_len) {
if (err != ERR_OK || once_transfered_bytes < batch_size) {
return err;
}
pos += batch_len;
pos += batch_size;
}

return ERR_OK;
Expand Down Expand Up @@ -516,11 +516,14 @@ error_code fds_file_object::put_content(/*in-out*/ std::istream &is,
galaxy::fds::GalaxyFDSClient *c = _service->get_client();

// get tokens from token bucket
if (!_service->_write_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes)) {
ddebug_f("the transfer count({}) is greater than burst size({}), so it is rejected by "
if (!_service->_write_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes,
FLAGS_fds_write_limit_rate << 20,
FLAGS_fds_write_burst_size
<< 20)) {
ddebug_f("the transfer count({}B) is greater than burst size({}MB), so it is rejected by "
"token bucket",
to_transfer_bytes,
_service->_write_token_bucket->burst());
FLAGS_fds_write_burst_size);
return ERR_BUSY;
}

Expand Down
18 changes: 9 additions & 9 deletions src/block_service/fds/fds_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@

namespace folly {
template <typename Clock>
class BasicTokenBucket;
class BasicDynamicTokenBucket;

using TokenBucket = BasicTokenBucket<std::chrono::steady_clock>;
}
using DynamicTokenBucket = BasicDynamicTokenBucket<std::chrono::steady_clock>;
} // namespace folly

namespace galaxy {
namespace fds {
class GalaxyFDSClient;
}
}
} // namespace galaxy

namespace dsn {
namespace dist {
Expand Down Expand Up @@ -58,8 +58,8 @@ class fds_service : public block_filesystem
private:
std::shared_ptr<galaxy::fds::GalaxyFDSClient> _client;
std::string _bucket_name;
std::unique_ptr<folly::TokenBucket> _read_token_bucket;
std::unique_ptr<folly::TokenBucket> _write_token_bucket;
std::unique_ptr<folly::DynamicTokenBucket> _write_token_bucket;
std::unique_ptr<folly::DynamicTokenBucket> _read_token_bucket;

friend class fds_file_object;
};
Expand Down Expand Up @@ -120,7 +120,7 @@ class fds_file_object : public block_file

static const size_t PIECE_SIZE = 16384; // 16k
};
}
}
}
} // namespace block_service
} // namespace dist
} // namespace dsn
#endif // FDS_SERVICE_H

0 comments on commit c576fe3

Please sign in to comment.