Skip to content

Commit

Permalink
feat: add read limiter for hdfs (XiaoMi#720)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Jan 7, 2021
1 parent 5fe7ff1 commit 76cdc50
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
11 changes: 10 additions & 1 deletion src/block_service/hdfs/hdfs_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <dsn/utility/filesystem.h>
#include <dsn/utility/flags.h>
#include <dsn/utility/safe_strerror_posix.h>
#include <dsn/utility/TokenBucket.h>
#include <dsn/utility/utils.h>

namespace dsn {
Expand All @@ -42,13 +43,16 @@ DSN_DEFINE_uint64("replication",
"hdfs read batch size, the default value is 64MB");
DSN_TAG_VARIABLE(hdfs_read_batch_size_bytes, FT_MUTABLE);

DSN_DEFINE_uint32("replication", hdfs_read_limit_rate_megabytes, 200, "hdfs read limit(MB/s)");
DSN_TAG_VARIABLE(hdfs_read_limit_rate_megabytes, FT_MUTABLE);

DSN_DEFINE_uint64("replication",
hdfs_write_batch_size_bytes,
64 << 20,
"hdfs write batch size, the default value is 64MB");
DSN_TAG_VARIABLE(hdfs_write_batch_size_bytes, FT_MUTABLE);

hdfs_service::hdfs_service() {}
hdfs_service::hdfs_service() { _read_token_bucket.reset(new folly::DynamicTokenBucket()); }

hdfs_service::~hdfs_service()
{
Expand Down Expand Up @@ -386,7 +390,12 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t start_pos,
uint64_t read_size = 0;
bool read_success = true;
while (cur_pos < start_pos + data_length) {
const uint64_t rate = FLAGS_hdfs_read_limit_rate_megabytes << 20;
read_size = std::min(start_pos + data_length - cur_pos, FLAGS_hdfs_read_batch_size_bytes);
// burst size should not be less than consume size
_service->_read_token_bucket->consumeWithBorrowAndWait(
read_size, rate, std::max(2 * rate, read_size));

tSize num_read_bytes = hdfsPread(_service->get_fs(),
read_file,
static_cast<tOffset>(cur_pos),
Expand Down
12 changes: 12 additions & 0 deletions src/block_service/hdfs/hdfs_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
#include <dsn/dist/block_service.h>
#include <hdfs/hdfs.h>

namespace folly {
template <typename Clock>
class BasicDynamicTokenBucket;

using DynamicTokenBucket = BasicDynamicTokenBucket<std::chrono::steady_clock>;
}

namespace dsn {
namespace dist {
namespace block_service {
Expand Down Expand Up @@ -52,6 +59,11 @@ class hdfs_service : public block_filesystem
hdfsFS _fs;
std::string _hdfs_name_node;
std::string _hdfs_path;

std::unique_ptr<folly::DynamicTokenBucket> _read_token_bucket;
// TODO: add write limiter

friend class hdfs_file_object;
};

class hdfs_file_object : public block_file
Expand Down

0 comments on commit 76cdc50

Please sign in to comment.