diff --git a/src/block_service/fds/fds_service.cpp b/src/block_service/fds/fds_service.cpp index 7f544f93e4..4b37b7b1fa 100644 --- a/src/block_service/fds/fds_service.cpp +++ b/src/block_service/fds/fds_service.cpp @@ -28,10 +28,16 @@ namespace dist { namespace block_service { DSN_DEFINE_uint32("replication", fds_write_limit_rate, 100, "write rate limit of fds(MB/s)"); +DSN_TAG_VARIABLE(fds_write_limit_rate, FT_MUTABLE); + +DSN_DEFINE_uint32("replication", fds_write_burst_size, 500, "write burst size of fds(MB)"); +DSN_TAG_VARIABLE(fds_write_burst_size, FT_MUTABLE); DSN_DEFINE_uint32("replication", fds_read_limit_rate, 100, "read rate limit of fds(MB/s)"); +DSN_TAG_VARIABLE(fds_read_limit_rate, FT_MUTABLE); DSN_DEFINE_uint32("replication", fds_read_batch_size, 100, "read batch size of fds(MB)"); +DSN_TAG_VARIABLE(fds_read_batch_size, FT_MUTABLE); class utils { @@ -109,16 +115,8 @@ const std::string fds_service::FILE_MD5_KEY = "content-md5"; fds_service::fds_service() { - /// For write operation, we can't send a file in batches. Because putContent interface of fds - /// will overwrite what was sent before for the same file. So we must send a file as a whole. - /// If file size > burst size, the file will be rejected by the token bucket. - /// Here we set burst_size to max value of double, in order to make file not rejected by the - /// token bucket - _write_token_bucket.reset(new folly::TokenBucket(FLAGS_fds_write_limit_rate << 20, - std::numeric_limits::max())); - - uint32_t burst_size = 2 * FLAGS_fds_read_limit_rate << 20; - _read_token_bucket.reset(new folly::TokenBucket(FLAGS_fds_read_limit_rate << 20, burst_size)); + _write_token_bucket.reset(new folly::DynamicTokenBucket()); + _read_token_bucket.reset(new folly::DynamicTokenBucket()); } fds_service::~fds_service() {} @@ -427,8 +425,6 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, /*out*/ std::ostream &os, /*out*/ uint64_t &transfered_bytes) { - const uint64_t BATCH_SIZE = std::min(FLAGS_fds_read_limit_rate, FLAGS_fds_read_batch_size) - << 20; error_code err = ERR_OK; transfered_bytes = 0; @@ -446,16 +442,20 @@ error_code fds_file_object::get_content_in_batches(uint64_t start, uint64_t pos = start; uint64_t once_transfered_bytes = 0; while (pos < start + to_transfer_bytes) { - uint64_t batch_len = std::min(BATCH_SIZE, start + to_transfer_bytes - pos); - // get tokens from token bucket - _service->_read_token_bucket->consumeWithBorrowAndWait(batch_len); + const uint64_t BATCH_SIZE = FLAGS_fds_read_batch_size << 20; + uint64_t batch_size = std::min(BATCH_SIZE, start + to_transfer_bytes - pos); + + // burst size should not be less than consume size + const uint64_t rate = FLAGS_fds_read_limit_rate << 20; + _service->_read_token_bucket->consumeWithBorrowAndWait( + batch_size, rate, std::max(2 * rate, batch_size)); - err = get_content(pos, batch_len, os, once_transfered_bytes); + err = get_content(pos, batch_size, os, once_transfered_bytes); transfered_bytes += once_transfered_bytes; - if (err != ERR_OK || once_transfered_bytes < batch_len) { + if (err != ERR_OK || once_transfered_bytes < batch_size) { return err; } - pos += batch_len; + pos += batch_size; } return ERR_OK; @@ -516,11 +516,14 @@ error_code fds_file_object::put_content(/*in-out*/ std::istream &is, galaxy::fds::GalaxyFDSClient *c = _service->get_client(); // get tokens from token bucket - if (!_service->_write_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes)) { - ddebug_f("the transfer count({}) is greater than burst size({}), so it is rejected by " + if (!_service->_write_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes, + FLAGS_fds_write_limit_rate << 20, + FLAGS_fds_write_burst_size + << 20)) { + ddebug_f("the transfer count({}B) is greater than burst size({}MB), so it is rejected by " "token bucket", to_transfer_bytes, - _service->_write_token_bucket->burst()); + FLAGS_fds_write_burst_size); return ERR_BUSY; } diff --git a/src/block_service/fds/fds_service.h b/src/block_service/fds/fds_service.h index 2d5ff9e8e3..dce709738d 100644 --- a/src/block_service/fds/fds_service.h +++ b/src/block_service/fds/fds_service.h @@ -5,16 +5,16 @@ namespace folly { template -class BasicTokenBucket; +class BasicDynamicTokenBucket; -using TokenBucket = BasicTokenBucket; -} +using DynamicTokenBucket = BasicDynamicTokenBucket; +} // namespace folly namespace galaxy { namespace fds { class GalaxyFDSClient; } -} +} // namespace galaxy namespace dsn { namespace dist { @@ -58,8 +58,8 @@ class fds_service : public block_filesystem private: std::shared_ptr _client; std::string _bucket_name; - std::unique_ptr _read_token_bucket; - std::unique_ptr _write_token_bucket; + std::unique_ptr _write_token_bucket; + std::unique_ptr _read_token_bucket; friend class fds_file_object; }; @@ -120,7 +120,7 @@ class fds_file_object : public block_file static const size_t PIECE_SIZE = 16384; // 16k }; -} -} -} +} // namespace block_service +} // namespace dist +} // namespace dsn #endif // FDS_SERVICE_H