From 76cdc503bc33f09978cb09f2b95045a00b2186a6 Mon Sep 17 00:00:00 2001 From: HeYuchen <377710264@qq.com> Date: Thu, 7 Jan 2021 16:40:04 +0800 Subject: [PATCH] feat: add read limiter for hdfs (#720) --- src/block_service/hdfs/hdfs_service.cpp | 11 ++++++++++- src/block_service/hdfs/hdfs_service.h | 12 ++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/block_service/hdfs/hdfs_service.cpp b/src/block_service/hdfs/hdfs_service.cpp index 4e604badd8..9cf2ed8f12 100644 --- a/src/block_service/hdfs/hdfs_service.cpp +++ b/src/block_service/hdfs/hdfs_service.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include namespace dsn { @@ -42,13 +43,16 @@ DSN_DEFINE_uint64("replication", "hdfs read batch size, the default value is 64MB"); DSN_TAG_VARIABLE(hdfs_read_batch_size_bytes, FT_MUTABLE); +DSN_DEFINE_uint32("replication", hdfs_read_limit_rate_megabytes, 200, "hdfs read limit(MB/s)"); +DSN_TAG_VARIABLE(hdfs_read_limit_rate_megabytes, FT_MUTABLE); + DSN_DEFINE_uint64("replication", hdfs_write_batch_size_bytes, 64 << 20, "hdfs write batch size, the default value is 64MB"); DSN_TAG_VARIABLE(hdfs_write_batch_size_bytes, FT_MUTABLE); -hdfs_service::hdfs_service() {} +hdfs_service::hdfs_service() { _read_token_bucket.reset(new folly::DynamicTokenBucket()); } hdfs_service::~hdfs_service() { @@ -386,7 +390,12 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t start_pos, uint64_t read_size = 0; bool read_success = true; while (cur_pos < start_pos + data_length) { + const uint64_t rate = FLAGS_hdfs_read_limit_rate_megabytes << 20; read_size = std::min(start_pos + data_length - cur_pos, FLAGS_hdfs_read_batch_size_bytes); + // burst size should not be less than consume size + _service->_read_token_bucket->consumeWithBorrowAndWait( + read_size, rate, std::max(2 * rate, read_size)); + tSize num_read_bytes = hdfsPread(_service->get_fs(), read_file, static_cast(cur_pos), diff --git a/src/block_service/hdfs/hdfs_service.h b/src/block_service/hdfs/hdfs_service.h index ce58f577bd..0ded4f7d46 100644 --- a/src/block_service/hdfs/hdfs_service.h +++ b/src/block_service/hdfs/hdfs_service.h @@ -20,6 +20,13 @@ #include #include +namespace folly { +template +class BasicDynamicTokenBucket; + +using DynamicTokenBucket = BasicDynamicTokenBucket; +} + namespace dsn { namespace dist { namespace block_service { @@ -52,6 +59,11 @@ class hdfs_service : public block_filesystem hdfsFS _fs; std::string _hdfs_name_node; std::string _hdfs_path; + + std::unique_ptr _read_token_bucket; + // TODO: add write limiter + + friend class hdfs_file_object; }; class hdfs_file_object : public block_file