From c85e7b42fa689858fb4a84462ea6fc6f538e02aa Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Tue, 19 Oct 2021 17:11:20 +0800 Subject: [PATCH 1/3] finish coding --- include/dsn/cpp/rpc_holder.h | 9 +- include/dsn/dist/replication/replica_envs.h | 1 + .../dist/replication/replication_app_base.h | 3 + .../token_bucket_throttling_controller.h | 77 +++++++++ src/common/replication_common.cpp | 1 + src/meta/app_env_validator.cpp | 5 + src/replica/replica.h | 3 +- src/replica/replication_app_base.cpp | 12 ++ ...oken_bucket_throttling_controller_test.cpp | 147 +++++++++++++++++ .../token_bucket_throttling_controller.cpp | 148 ++++++++++++++++++ 10 files changed, 404 insertions(+), 2 deletions(-) create mode 100644 include/dsn/utils/token_bucket_throttling_controller.h create mode 100644 src/utils/test/token_bucket_throttling_controller_test.cpp create mode 100644 src/utils/token_bucket_throttling_controller.cpp diff --git a/include/dsn/cpp/rpc_holder.h b/include/dsn/cpp/rpc_holder.h index 031745d333..bba436b31c 100644 --- a/include/dsn/cpp/rpc_holder.h +++ b/include/dsn/cpp/rpc_holder.h @@ -114,6 +114,12 @@ class rpc_holder return _i->thrift_response; } + dsn::error_code &error() const + { + dassert(_i, "rpc_holder is uninitialized"); + return _i->rpc_error; + } + message_ex *dsn_request() const { dassert(_i, "rpc_holder is uninitialized"); @@ -291,7 +297,7 @@ class rpc_holder message_ex *dsn_response = dsn_request->create_response(); marshall(dsn_response, thrift_response); - dsn_rpc_reply(dsn_response); + dsn_rpc_reply(dsn_response, rpc_error); } ~internal() @@ -305,6 +311,7 @@ class rpc_holder message_ex *dsn_request; std::unique_ptr thrift_request; TResponse thrift_response; + dsn::error_code rpc_error = dsn::ERR_OK; bool auto_reply; }; diff --git a/include/dsn/dist/replication/replica_envs.h b/include/dsn/dist/replication/replica_envs.h index 21446597c4..0ecb8ca7fc 100644 --- a/include/dsn/dist/replication/replica_envs.h +++ b/include/dsn/dist/replication/replica_envs.h @@ -57,6 +57,7 @@ class replica_envs static const std::string BUSINESS_INFO; static const std::string REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS; static const std::string READ_QPS_THROTTLING; + static const std::string READ_SIZE_THROTTLING; static const std::string BACKUP_REQUEST_QPS_THROTTLING; static const std::string SPLIT_VALIDATE_PARTITION_HASH; static const std::string USER_SPECIFIED_COMPACTION; diff --git a/include/dsn/dist/replication/replication_app_base.h b/include/dsn/dist/replication/replication_app_base.h index 8a4846fd1a..5bbe122889 100644 --- a/include/dsn/dist/replication/replication_app_base.h +++ b/include/dsn/dist/replication/replication_app_base.h @@ -261,6 +261,7 @@ class replication_app_base : public replica_base const std::string &learn_dir() const { return _dir_learn; } const std::string &backup_dir() const { return _dir_backup; } const std::string &bulk_load_dir() const { return _dir_bulk_load; } + const app_info *get_app_info() const; ::dsn::replication::decree last_committed_decree() const { return _last_committed_decree.load(); @@ -294,6 +295,8 @@ class replication_app_base : public replica_base replica_init_info _info; explicit replication_app_base(::dsn::replication::replica *replica); + + perf_counter_wrapper* get_counter_recent_read_throttling_reject_count(); }; } // namespace replication diff --git a/include/dsn/utils/token_bucket_throttling_controller.h b/include/dsn/utils/token_bucket_throttling_controller.h new file mode 100644 index 0000000000..5d8ea59f8d --- /dev/null +++ b/include/dsn/utils/token_bucket_throttling_controller.h @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace dsn { +namespace utils { + +using DynamicTokenBucket = folly::BasicDynamicTokenBucket; + +// token_bucket_throttling_controller ignores `delay` parameter +class token_bucket_throttling_controller +{ +private: + friend class token_bucket_throttling_controller_test; + + std::unique_ptr _token_bucket; + + bool _enabled; + std::string _env_value; + int32_t _partition_count = 0; + double _rate; + double _burstsize; + dsn::perf_counter_wrapper* _reject_task_counter; + +public: + token_bucket_throttling_controller(); + + token_bucket_throttling_controller(dsn::perf_counter_wrapper* reject_task_counter); + + bool control(int32_t request_units); + + void only_count(int32_t request_units); + + void reset(bool &changed, std::string &old_env_value); + // return the current env value. + const std::string &env_value() const; + + bool parse_from_env(const std::string &env_value, + int partition_count, + std::string &parse_error, + bool &changed, + std::string &old_env_value); + + static bool string_to_value(std::string str,int64_t &value); + + static bool validate(const std::string &env, std::string &hint_message); + + static bool transform_env_string(const std::string &env, int64_t &reject_size_value,std::string &hint_message); + +}; + +} // namespace replication +} // namespace dsn diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp index 8f9989f1fd..012e6690d1 100644 --- a/src/common/replication_common.cpp +++ b/src/common/replication_common.cpp @@ -656,6 +656,7 @@ const std::string replica_envs::BUSINESS_INFO("business.info"); const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS( "replica_access_controller.allowed_users"); const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling"); +const std::string replica_envs::READ_SIZE_THROTTLING("replica.read_throttling_by_size"); const std::string replica_envs::SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash"); const std::string replica_envs::USER_SPECIFIED_COMPACTION("user_specified_compaction"); diff --git a/src/meta/app_env_validator.cpp b/src/meta/app_env_validator.cpp index c9bf3dbbc8..8ba1835b73 100644 --- a/src/meta/app_env_validator.cpp +++ b/src/meta/app_env_validator.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "app_env_validator.h" namespace dsn { @@ -185,6 +186,10 @@ void app_env_validator::register_all_validators() {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr}, {replica_envs::READ_QPS_THROTTLING, std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)}, + {replica_envs::READ_SIZE_THROTTLING, + std::bind(&utils::token_bucket_throttling_controller::validate, + std::placeholders::_1, + std::placeholders::_2)}, {replica_envs::SPLIT_VALIDATE_PARTITION_HASH, std::bind(&check_split_validation, std::placeholders::_1, std::placeholders::_2)}, {replica_envs::USER_SPECIFIED_COMPACTION, nullptr}, diff --git a/src/replica/replica.h b/src/replica/replica.h index 4589623cdc..e9ab3bd597 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -48,6 +48,7 @@ #include #include +#include #include "common/replication_common.h" #include "mutation.h" @@ -62,7 +63,6 @@ class access_controller; } // namespace security namespace replication { -class replication_app_base; class replica_stub; class replica_duplicator_manager; class replica_backup_manager; @@ -467,6 +467,7 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class replica_disk_migrator; friend class replica_disk_test; friend class replica_disk_migrate_test; + friend perf_counter_wrapper* replication_app_base::get_counter_recent_read_throttling_reject_count(); // replica configuration, updated by update_local_configuration ONLY replica_configuration _config; diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp index 98c4e3d6d2..b235101387 100644 --- a/src/replica/replication_app_base.cpp +++ b/src/replica/replication_app_base.cpp @@ -603,5 +603,17 @@ ::dsn::error_code replication_app_base::update_init_info_ballot_and_decree(repli _info.init_offset_in_private_log, r->last_durable_decree()); } + +const app_info *replication_app_base::get_app_info() const { return _replica->get_app_info(); } + +perf_counter_wrapper *replication_app_base::get_counter_recent_read_throttling_reject_count() +{ + dassert(_replica, "_replica is null"); + dassert(&_replica->_counter_recent_read_throttling_reject_count, + "_counter_recent_read_throttling_reject_count is null"); + + return &_replica->_counter_recent_read_throttling_reject_count; +} + } // namespace replication } // namespace dsn diff --git a/src/utils/test/token_bucket_throttling_controller_test.cpp b/src/utils/test/token_bucket_throttling_controller_test.cpp new file mode 100644 index 0000000000..198cb71481 --- /dev/null +++ b/src/utils/test/token_bucket_throttling_controller_test.cpp @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include + +namespace dsn { +namespace utils { + +class token_bucket_throttling_controller_test : public ::testing::Test +{ +public: + void test_parse_env_basic_token_bucket_throttling() + { + token_bucket_throttling_controller cntl; + std::string parse_err; + bool env_changed = false; + std::string old_value; + + // token_bucket_throttling_controller doesn't support delay only + ASSERT_TRUE(cntl.parse_from_env("20000*delay*100", 4, parse_err, env_changed, old_value)); + ASSERT_EQ(cntl._enabled, true); + ASSERT_EQ(cntl._env_value, "20000*delay*100"); + ASSERT_EQ(cntl._partition_count, 4); + ASSERT_EQ(cntl._rate, INT_MAX); + ASSERT_EQ(cntl._burstsize, INT_MAX); + ASSERT_EQ(env_changed, true); + ASSERT_EQ(old_value, ""); + ASSERT_EQ(parse_err, ""); + + ASSERT_TRUE(cntl.parse_from_env("200K", 4, parse_err, env_changed, old_value)); + ASSERT_EQ(cntl._enabled, true); + ASSERT_EQ(cntl._env_value, "200K"); + ASSERT_EQ(cntl._partition_count, 4); + ASSERT_EQ(cntl._rate, 200000); + ASSERT_EQ(cntl._burstsize, 200000); + ASSERT_EQ(env_changed, true); + ASSERT_EQ(old_value, "20000*delay*100"); + ASSERT_EQ(parse_err, ""); + + ASSERT_TRUE(cntl.parse_from_env( + "20000*delay*100,20000*reject*100", 4, parse_err, env_changed, old_value)); + ASSERT_EQ(cntl._enabled, true); + ASSERT_EQ(cntl._env_value, "20000*delay*100,20000*reject*100"); + ASSERT_EQ(cntl._partition_count, 4); + ASSERT_EQ(cntl._rate, 20000); + ASSERT_EQ(cntl._burstsize, 20000); + ASSERT_EQ(env_changed, true); + ASSERT_EQ(old_value, "200K"); + ASSERT_EQ(parse_err, ""); + + // invalid argument + + ASSERT_FALSE(cntl.parse_from_env("*delay*100", 4, parse_err, env_changed, old_value)); + ASSERT_EQ(env_changed, false); + ASSERT_NE(parse_err, ""); + ASSERT_EQ(cntl._enabled, true); // ensure invalid env won't stop throttling + + ASSERT_FALSE(cntl.parse_from_env("", 4, parse_err, env_changed, old_value)); + ASSERT_EQ(env_changed, false); + ASSERT_NE(parse_err, ""); + ASSERT_EQ(cntl._enabled, true); + } + + void throttle_test() + { + auto cntl = std::make_unique(); + std::string parse_err; + bool env_changed = false; + std::string old_value; + const int partition_count = 4; + + int throttle_limit = 200000; + cntl->parse_from_env( + std::to_string(throttle_limit), partition_count, parse_err, env_changed, old_value); + + auto token_bucket = std::make_unique(); + int fail_count = 0; + for (int i = 0; i < 100000; i++) { + token_bucket->consumeWithBorrowAndWait( + 1, throttle_limit / partition_count * 0.8, throttle_limit / partition_count * 1.0); + if (!cntl->control(1)) { + fail_count++; + } + } + ASSERT_EQ(fail_count, 0); + + sleep(1); + + fail_count = 0; + for (int i = 0; i < 100000; i++) { + token_bucket->consumeWithBorrowAndWait( + 1, throttle_limit / partition_count * 1.2, throttle_limit / partition_count * 1.5); + if (!cntl->control(1)) { + fail_count++; + } + } + ASSERT_GT(fail_count, 10000); + + sleep(1); + + fail_count = 0; + int fail_count1 = 0; + for (int i = 0; i < 200000; i++) { + if (i < 100000) { + token_bucket->consumeWithBorrowAndWait(1, + throttle_limit / partition_count * 1.2, + throttle_limit / partition_count * 1.5); + fail_count1 = fail_count; + } else { + token_bucket->consumeWithBorrowAndWait(1, + throttle_limit / partition_count * 0.2, + throttle_limit / partition_count * 0.3); + } + if (!cntl->control(1)) { + fail_count++; + } + } + ASSERT_GT(fail_count1, 10000); + ASSERT_LE(fail_count, fail_count1 * 1.2); + } +}; + +TEST_F(token_bucket_throttling_controller_test, test_parse_env_basic_token_bucket_throttling) +{ + test_parse_env_basic_token_bucket_throttling(); +} + +TEST_F(token_bucket_throttling_controller_test, throttle_test) { throttle_test(); } +} // namespace utils +} // namespace dsn diff --git a/src/utils/token_bucket_throttling_controller.cpp b/src/utils/token_bucket_throttling_controller.cpp new file mode 100644 index 0000000000..2f67e4394b --- /dev/null +++ b/src/utils/token_bucket_throttling_controller.cpp @@ -0,0 +1,148 @@ +#include + +namespace dsn { +namespace utils { + +token_bucket_throttling_controller::token_bucket_throttling_controller() + : _enabled(false), _partition_count(0), _rate(0), _burstsize(0) +{ + _token_bucket = std::make_unique(); +} + +token_bucket_throttling_controller::token_bucket_throttling_controller( + dsn::perf_counter_wrapper *reject_task_counter) + : _enabled(false), _partition_count(0), _rate(0), _burstsize(0) + { + _token_bucket = std::make_unique(); + + dassert(reject_task_counter, "reject_task_counter == nullptr"); + _reject_task_counter = reject_task_counter; + } + + void token_bucket_throttling_controller::only_count(int32_t request_units) + { + if (!_enabled) { + return ; + } + _token_bucket->consumeWithBorrowNonBlocking((double)request_units, _rate, _burstsize); + } + + bool token_bucket_throttling_controller::control(int32_t request_units = 1) + { + if (!_enabled) { + return true; + } + auto res = + _token_bucket->consumeWithBorrowNonBlocking((double)request_units, _rate, _burstsize); + + if (res.get_value_or(0) > 0) { + if (_reject_task_counter){ + _reject_task_counter->operator->()->increment(); + } + return false; + } + return true; + } + + void token_bucket_throttling_controller::reset(bool &changed, std::string &old_env_value) + { + if (_enabled) { + changed = true; + old_env_value = _env_value; + _enabled = false; + _env_value.clear(); + _partition_count = 0; + _rate = 0; + _burstsize = 0; + } else { + changed = false; + } + } + + // return the current env value. + const std::string &token_bucket_throttling_controller::env_value() const { return _env_value; } + + bool token_bucket_throttling_controller::parse_from_env(const std::string &env_value, + int partition_count, + std::string &parse_error, + bool &changed, + std::string &old_env_value) + { + changed = false; + if (_enabled && env_value == _env_value && partition_count == _partition_count) + return true; + + int64_t reject_size_value; + if (!transform_env_string(env_value, reject_size_value, parse_error)) { + return false; + } + + changed = true; + old_env_value = _env_value; + _enabled = true; + _env_value = env_value; + _partition_count = partition_count; + _rate = reject_size_value / (partition_count > 0 ? partition_count : 1); + _burstsize = _rate; + return true; + } + + bool token_bucket_throttling_controller::string_to_value(std::string str, int64_t &value) + { + int64_t unit_multiplier = 1; + if (*str.rbegin() == 'M') { + unit_multiplier = 1000 * 1000; + } else if (*str.rbegin() == 'K') { + unit_multiplier = 1000; + } + if (unit_multiplier != 1) { + str.pop_back(); + } + if (!buf2int64(str, value) || value < 0) { + return false; + } + value *= unit_multiplier; + return true; + } + + bool token_bucket_throttling_controller::validate(const std::string &env, + std::string &hint_message) + { + int64_t temp; + bool validated = transform_env_string(env, temp, hint_message); + return validated; + }; + + bool token_bucket_throttling_controller::transform_env_string(const std::string &env, + int64_t &reject_size_value, + std::string &hint_message) + { + + if (buf2int64(env, reject_size_value) && reject_size_value >= 0) { + return true; + } + + // format like "200K" + if (string_to_value(env, reject_size_value)) { + return true; + } + + // format like "20000*delay*100" + if (env.find("delay") != -1 && env.find("reject") == -1) { + reject_size_value = INT_MAX; + return true; + } + + // format like "20000*delay*100,20000*reject*100" + auto comma_index = env.find(","); + auto star_index = env.find("*reject", comma_index + 1); + if (star_index < 0) { + return false; + } + auto reject_size = env.substr(comma_index + 1, star_index - comma_index - 1); + + return string_to_value(reject_size, reject_size_value); + } + + } // namespace utils + } // namespace dsn \ No newline at end of file From d70037ef097106e16f2c69d3697e977a1259900d Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Tue, 19 Oct 2021 18:51:35 +0800 Subject: [PATCH 2/3] fix unit tst --- include/dsn/cpp/rpc_holder.h | 2 +- .../dist/replication/replication_app_base.h | 2 +- .../token_bucket_throttling_controller.h | 16 +- src/replica/replica.h | 3 +- ...oken_bucket_throttling_controller_test.cpp | 60 +++-- .../token_bucket_throttling_controller.cpp | 252 ++++++++++-------- 6 files changed, 198 insertions(+), 137 deletions(-) diff --git a/include/dsn/cpp/rpc_holder.h b/include/dsn/cpp/rpc_holder.h index bba436b31c..52a5321ba0 100644 --- a/include/dsn/cpp/rpc_holder.h +++ b/include/dsn/cpp/rpc_holder.h @@ -114,7 +114,7 @@ class rpc_holder return _i->thrift_response; } - dsn::error_code &error() const + dsn::error_code &error() const { dassert(_i, "rpc_holder is uninitialized"); return _i->rpc_error; diff --git a/include/dsn/dist/replication/replication_app_base.h b/include/dsn/dist/replication/replication_app_base.h index 5bbe122889..3c45b9f211 100644 --- a/include/dsn/dist/replication/replication_app_base.h +++ b/include/dsn/dist/replication/replication_app_base.h @@ -296,7 +296,7 @@ class replication_app_base : public replica_base explicit replication_app_base(::dsn::replication::replica *replica); - perf_counter_wrapper* get_counter_recent_read_throttling_reject_count(); + perf_counter_wrapper *get_counter_recent_read_throttling_reject_count(); }; } // namespace replication diff --git a/include/dsn/utils/token_bucket_throttling_controller.h b/include/dsn/utils/token_bucket_throttling_controller.h index 5d8ea59f8d..c911ea739b 100644 --- a/include/dsn/utils/token_bucket_throttling_controller.h +++ b/include/dsn/utils/token_bucket_throttling_controller.h @@ -44,12 +44,14 @@ class token_bucket_throttling_controller int32_t _partition_count = 0; double _rate; double _burstsize; - dsn::perf_counter_wrapper* _reject_task_counter; + + bool is_get_perfcounter = false; + dsn::perf_counter_wrapper *_reject_task_counter; public: token_bucket_throttling_controller(); - token_bucket_throttling_controller(dsn::perf_counter_wrapper* reject_task_counter); + token_bucket_throttling_controller(dsn::perf_counter_wrapper *reject_task_counter); bool control(int32_t request_units); @@ -65,13 +67,15 @@ class token_bucket_throttling_controller bool &changed, std::string &old_env_value); - static bool string_to_value(std::string str,int64_t &value); + static bool string_to_value(std::string str, int64_t &value); static bool validate(const std::string &env, std::string &hint_message); - static bool transform_env_string(const std::string &env, int64_t &reject_size_value,std::string &hint_message); - + static bool transform_env_string(const std::string &env, + int64_t &reject_size_value, + bool &enabled, + std::string &hint_message); }; -} // namespace replication +} // namespace utils } // namespace dsn diff --git a/src/replica/replica.h b/src/replica/replica.h index e9ab3bd597..a6c420db13 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -467,7 +467,8 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class replica_disk_migrator; friend class replica_disk_test; friend class replica_disk_migrate_test; - friend perf_counter_wrapper* replication_app_base::get_counter_recent_read_throttling_reject_count(); + friend perf_counter_wrapper * + replication_app_base::get_counter_recent_read_throttling_reject_count(); // replica configuration, updated by update_local_configuration ONLY replica_configuration _config; diff --git a/src/utils/test/token_bucket_throttling_controller_test.cpp b/src/utils/test/token_bucket_throttling_controller_test.cpp index 198cb71481..929fa5decc 100644 --- a/src/utils/test/token_bucket_throttling_controller_test.cpp +++ b/src/utils/test/token_bucket_throttling_controller_test.cpp @@ -33,49 +33,71 @@ class token_bucket_throttling_controller_test : public ::testing::Test bool env_changed = false; std::string old_value; + int partition_count = 4; + std::string old_env = ""; + std::string env = "20000*delay*100"; // token_bucket_throttling_controller doesn't support delay only - ASSERT_TRUE(cntl.parse_from_env("20000*delay*100", 4, parse_err, env_changed, old_value)); + ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value)); + ASSERT_EQ(cntl._env_value, env); + ASSERT_EQ(cntl._partition_count, 4); + ASSERT_EQ(cntl._burstsize, 0); + ASSERT_EQ(cntl._rate, 0); + ASSERT_EQ(cntl._enabled, false); + ASSERT_EQ(env_changed, true); + ASSERT_EQ(old_value, old_env); + ASSERT_EQ(parse_err, ""); + + old_env = env; + env = "200K"; + ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value)); ASSERT_EQ(cntl._enabled, true); - ASSERT_EQ(cntl._env_value, "20000*delay*100"); + ASSERT_EQ(cntl._env_value, env); ASSERT_EQ(cntl._partition_count, 4); - ASSERT_EQ(cntl._rate, INT_MAX); - ASSERT_EQ(cntl._burstsize, INT_MAX); + ASSERT_EQ(cntl._rate, 200000 / partition_count); + ASSERT_EQ(cntl._burstsize, 200000 / partition_count); ASSERT_EQ(env_changed, true); - ASSERT_EQ(old_value, ""); + ASSERT_EQ(old_value, old_env); ASSERT_EQ(parse_err, ""); - ASSERT_TRUE(cntl.parse_from_env("200K", 4, parse_err, env_changed, old_value)); + old_env = env; + env = "20000*delay*100,20000*reject*100"; + ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value)); ASSERT_EQ(cntl._enabled, true); - ASSERT_EQ(cntl._env_value, "200K"); + ASSERT_EQ(cntl._env_value, env); ASSERT_EQ(cntl._partition_count, 4); - ASSERT_EQ(cntl._rate, 200000); - ASSERT_EQ(cntl._burstsize, 200000); + ASSERT_EQ(cntl._rate, 20000 / partition_count); + ASSERT_EQ(cntl._burstsize, 20000 / partition_count); ASSERT_EQ(env_changed, true); - ASSERT_EQ(old_value, "20000*delay*100"); + ASSERT_EQ(old_value, old_env); ASSERT_EQ(parse_err, ""); - ASSERT_TRUE(cntl.parse_from_env( - "20000*delay*100,20000*reject*100", 4, parse_err, env_changed, old_value)); + old_env = env; + env = "20000*reject*100"; + ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value)); ASSERT_EQ(cntl._enabled, true); - ASSERT_EQ(cntl._env_value, "20000*delay*100,20000*reject*100"); + ASSERT_EQ(cntl._env_value, env); ASSERT_EQ(cntl._partition_count, 4); - ASSERT_EQ(cntl._rate, 20000); - ASSERT_EQ(cntl._burstsize, 20000); + ASSERT_EQ(cntl._rate, 20000 / partition_count); + ASSERT_EQ(cntl._burstsize, 20000 / partition_count); ASSERT_EQ(env_changed, true); - ASSERT_EQ(old_value, "200K"); + ASSERT_EQ(old_value, old_env); ASSERT_EQ(parse_err, ""); // invalid argument - - ASSERT_FALSE(cntl.parse_from_env("*delay*100", 4, parse_err, env_changed, old_value)); + old_env = env; + env = "*deldday*100"; + ASSERT_FALSE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value)); ASSERT_EQ(env_changed, false); - ASSERT_NE(parse_err, ""); + ASSERT_EQ(parse_err, "wrong format, you can set like 20000 or 20K"); ASSERT_EQ(cntl._enabled, true); // ensure invalid env won't stop throttling + ASSERT_EQ(old_value, old_env); ASSERT_FALSE(cntl.parse_from_env("", 4, parse_err, env_changed, old_value)); ASSERT_EQ(env_changed, false); ASSERT_NE(parse_err, ""); + ASSERT_EQ(parse_err, "wrong format, you can set like 20000 or 20K"); ASSERT_EQ(cntl._enabled, true); + ASSERT_EQ(old_value, old_env); } void throttle_test() diff --git a/src/utils/token_bucket_throttling_controller.cpp b/src/utils/token_bucket_throttling_controller.cpp index 2f67e4394b..7f19d6c6ec 100644 --- a/src/utils/token_bucket_throttling_controller.cpp +++ b/src/utils/token_bucket_throttling_controller.cpp @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #include namespace dsn { @@ -7,142 +24,159 @@ token_bucket_throttling_controller::token_bucket_throttling_controller() : _enabled(false), _partition_count(0), _rate(0), _burstsize(0) { _token_bucket = std::make_unique(); + is_get_perfcounter = false; } token_bucket_throttling_controller::token_bucket_throttling_controller( dsn::perf_counter_wrapper *reject_task_counter) : _enabled(false), _partition_count(0), _rate(0), _burstsize(0) - { - _token_bucket = std::make_unique(); +{ + _token_bucket = std::make_unique(); - dassert(reject_task_counter, "reject_task_counter == nullptr"); - _reject_task_counter = reject_task_counter; - } + dassert(reject_task_counter, "reject_task_counter == nullptr"); + _reject_task_counter = reject_task_counter; + is_get_perfcounter = true; +} - void token_bucket_throttling_controller::only_count(int32_t request_units) - { - if (!_enabled) { - return ; - } - _token_bucket->consumeWithBorrowNonBlocking((double)request_units, _rate, _burstsize); +void token_bucket_throttling_controller::only_count(int32_t request_units) +{ + if (!_enabled) { + return; } + _token_bucket->consumeWithBorrowNonBlocking((double)request_units, _rate, _burstsize); +} - bool token_bucket_throttling_controller::control(int32_t request_units = 1) - { - if (!_enabled) { - return true; - } - auto res = - _token_bucket->consumeWithBorrowNonBlocking((double)request_units, _rate, _burstsize); - - if (res.get_value_or(0) > 0) { - if (_reject_task_counter){ - _reject_task_counter->operator->()->increment(); - } - return false; - } +bool token_bucket_throttling_controller::control(int32_t request_units = 1) +{ + if (!_enabled) { return true; } + auto res = + _token_bucket->consumeWithBorrowNonBlocking((double)request_units, _rate, _burstsize); - void token_bucket_throttling_controller::reset(bool &changed, std::string &old_env_value) - { - if (_enabled) { - changed = true; - old_env_value = _env_value; - _enabled = false; - _env_value.clear(); - _partition_count = 0; - _rate = 0; - _burstsize = 0; - } else { - changed = false; + if (res.get_value_or(0) > 0) { + if (is_get_perfcounter) { + _reject_task_counter->operator->()->increment(); } + return false; } + return true; +} - // return the current env value. - const std::string &token_bucket_throttling_controller::env_value() const { return _env_value; } - - bool token_bucket_throttling_controller::parse_from_env(const std::string &env_value, - int partition_count, - std::string &parse_error, - bool &changed, - std::string &old_env_value) - { +void token_bucket_throttling_controller::reset(bool &changed, std::string &old_env_value) +{ + if (_enabled) { + changed = true; + old_env_value = _env_value; + _enabled = false; + _env_value.clear(); + _partition_count = 0; + _rate = 0; + _burstsize = 0; + } else { changed = false; - if (_enabled && env_value == _env_value && partition_count == _partition_count) - return true; + } +} - int64_t reject_size_value; - if (!transform_env_string(env_value, reject_size_value, parse_error)) { - return false; - } +// return the current env value. +const std::string &token_bucket_throttling_controller::env_value() const { return _env_value; } - changed = true; - old_env_value = _env_value; - _enabled = true; - _env_value = env_value; - _partition_count = partition_count; - _rate = reject_size_value / (partition_count > 0 ? partition_count : 1); - _burstsize = _rate; +bool token_bucket_throttling_controller::parse_from_env(const std::string &env_value, + int partition_count, + std::string &parse_error, + bool &changed, + std::string &old_env_value) +{ + old_env_value = _env_value; + changed = false; + + if (_enabled && env_value == _env_value && partition_count == _partition_count) return true; + + int64_t reject_size_value; + bool enabled; + if (!transform_env_string(env_value, reject_size_value, enabled, parse_error)) { + return false; } - bool token_bucket_throttling_controller::string_to_value(std::string str, int64_t &value) - { - int64_t unit_multiplier = 1; - if (*str.rbegin() == 'M') { - unit_multiplier = 1000 * 1000; - } else if (*str.rbegin() == 'K') { - unit_multiplier = 1000; - } - if (unit_multiplier != 1) { - str.pop_back(); - } - if (!buf2int64(str, value) || value < 0) { - return false; - } - value *= unit_multiplier; + changed = true; + + _enabled = enabled; + _env_value = env_value; + _partition_count = partition_count; + _rate = reject_size_value / (partition_count > 0 ? partition_count : 1); + _burstsize = _rate; + return true; +} + +bool token_bucket_throttling_controller::string_to_value(std::string str, int64_t &value) +{ + int64_t unit_multiplier = 1; + if (*str.rbegin() == 'M') { + unit_multiplier = 1000 * 1000; + } else if (*str.rbegin() == 'K') { + unit_multiplier = 1000; + } + if (unit_multiplier != 1) { + str.pop_back(); + } + if (!buf2int64(str, value) || value < 0) { + return false; + } + value *= unit_multiplier; + return true; +} + +bool token_bucket_throttling_controller::validate(const std::string &env, std::string &hint_message) +{ + int64_t temp; + bool temp_bool; + bool validated = transform_env_string(env, temp, temp_bool, hint_message); + return validated; +}; + +bool token_bucket_throttling_controller::transform_env_string(const std::string &env, + int64_t &reject_size_value, + bool &enabled, + std::string &hint_message) +{ + enabled = true; + + if (buf2int64(env, reject_size_value) && reject_size_value >= 0) { return true; } - bool token_bucket_throttling_controller::validate(const std::string &env, - std::string &hint_message) - { - int64_t temp; - bool validated = transform_env_string(env, temp, hint_message); - return validated; - }; - - bool token_bucket_throttling_controller::transform_env_string(const std::string &env, - int64_t &reject_size_value, - std::string &hint_message) - { - - if (buf2int64(env, reject_size_value) && reject_size_value >= 0) { - return true; - } + // format like "200K" + if (string_to_value(env, reject_size_value)) { + return true; + } - // format like "200K" - if (string_to_value(env, reject_size_value)) { - return true; - } + // format like "20000*delay*100" + if (env.find("delay") != -1 && env.find("reject") == -1) { + reject_size_value = 0; + enabled = false; - // format like "20000*delay*100" - if (env.find("delay") != -1 && env.find("reject") == -1) { - reject_size_value = INT_MAX; - return true; - } + dinfo("token_bucket_throttling_controller doesn't support delay method, so throttling " + "controller is disabled now"); + return true; + } - // format like "20000*delay*100,20000*reject*100" - auto comma_index = env.find(","); - auto star_index = env.find("*reject", comma_index + 1); - if (star_index < 0) { - return false; - } - auto reject_size = env.substr(comma_index + 1, star_index - comma_index - 1); + // format like "20000*delay*100,20000*reject*100" + auto comma_index = env.find(","); + auto star_index = env.find("*reject", comma_index + 1); + if (star_index < 0) { + hint_message = "wrong format, you can set like 20000 or 20K"; + return false; + } + auto reject_size = env.substr(comma_index + 1, star_index - comma_index - 1); - return string_to_value(reject_size, reject_size_value); + if (string_to_value(reject_size, reject_size_value)) { + return true; } - } // namespace utils - } // namespace dsn \ No newline at end of file + hint_message = "wrong format, you can set like 20000 or 20K"; + return false; +} + +} // namespace utils +} // namespace dsn \ No newline at end of file From 9f543cc56da166c1dbe70f1cbca87d713d020079 Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Tue, 19 Oct 2021 18:52:34 +0800 Subject: [PATCH 3/3] add a line --- src/utils/token_bucket_throttling_controller.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/token_bucket_throttling_controller.cpp b/src/utils/token_bucket_throttling_controller.cpp index 7f19d6c6ec..34e61cfe42 100644 --- a/src/utils/token_bucket_throttling_controller.cpp +++ b/src/utils/token_bucket_throttling_controller.cpp @@ -179,4 +179,4 @@ bool token_bucket_throttling_controller::transform_env_string(const std::string } } // namespace utils -} // namespace dsn \ No newline at end of file +} // namespace dsn