Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: support read throttling by size #938

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion include/dsn/cpp/rpc_holder.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ class rpc_holder
return _i->thrift_response;
}

dsn::error_code &error() const
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can add some unit tests for rpc_holder

{
dassert(_i, "rpc_holder is uninitialized");
return _i->rpc_error;
}

message_ex *dsn_request() const
{
dassert(_i, "rpc_holder is uninitialized");
Expand Down Expand Up @@ -291,7 +297,7 @@ class rpc_holder

message_ex *dsn_response = dsn_request->create_response();
marshall(dsn_response, thrift_response);
dsn_rpc_reply(dsn_response);
dsn_rpc_reply(dsn_response, rpc_error);
}

~internal()
Expand All @@ -305,6 +311,7 @@ class rpc_holder
message_ex *dsn_request;
std::unique_ptr<TRequest> thrift_request;
TResponse thrift_response;
dsn::error_code rpc_error = dsn::ERR_OK;

bool auto_reply;
};
Expand Down
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replica_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class replica_envs
static const std::string BUSINESS_INFO;
static const std::string REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS;
static const std::string READ_QPS_THROTTLING;
static const std::string READ_SIZE_THROTTLING;
static const std::string BACKUP_REQUEST_QPS_THROTTLING;
static const std::string SPLIT_VALIDATE_PARTITION_HASH;
static const std::string USER_SPECIFIED_COMPACTION;
Expand Down
3 changes: 3 additions & 0 deletions include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ class replication_app_base : public replica_base
const std::string &learn_dir() const { return _dir_learn; }
const std::string &backup_dir() const { return _dir_backup; }
const std::string &bulk_load_dir() const { return _dir_bulk_load; }
const app_info *get_app_info() const;
::dsn::replication::decree last_committed_decree() const
{
return _last_committed_decree.load();
Expand Down Expand Up @@ -294,6 +295,8 @@ class replication_app_base : public replica_base
replica_init_info _info;

explicit replication_app_base(::dsn::replication::replica *replica);

perf_counter_wrapper *get_counter_recent_read_throttling_reject_count();
};

} // namespace replication
Expand Down
81 changes: 81 additions & 0 deletions include/dsn/utils/token_bucket_throttling_controller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <stdint.h>
#include <string>
#include <dsn/utility/TokenBucket.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/strings.h>
#include <dsn/utility/string_conv.h>
#include <dsn/c/api_layer1.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>

namespace dsn {
namespace utils {

using DynamicTokenBucket = folly::BasicDynamicTokenBucket<std::chrono::steady_clock>;

// token_bucket_throttling_controller ignores `delay` parameter
class token_bucket_throttling_controller
{
private:
friend class token_bucket_throttling_controller_test;

std::unique_ptr<DynamicTokenBucket> _token_bucket;

bool _enabled;
std::string _env_value;
int32_t _partition_count = 0;
double _rate;
double _burstsize;

bool is_get_perfcounter = false;
dsn::perf_counter_wrapper *_reject_task_counter;

public:
token_bucket_throttling_controller();

token_bucket_throttling_controller(dsn::perf_counter_wrapper *reject_task_counter);

bool control(int32_t request_units);

void only_count(int32_t request_units);

void reset(bool &changed, std::string &old_env_value);
// return the current env value.
const std::string &env_value() const;

bool parse_from_env(const std::string &env_value,
int partition_count,
std::string &parse_error,
bool &changed,
std::string &old_env_value);

static bool string_to_value(std::string str, int64_t &value);

static bool validate(const std::string &env, std::string &hint_message);

static bool transform_env_string(const std::string &env,
int64_t &reject_size_value,
bool &enabled,
std::string &hint_message);
};

} // namespace utils
} // namespace dsn
1 change: 1 addition & 0 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ const std::string replica_envs::BUSINESS_INFO("business.info");
const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS(
"replica_access_controller.allowed_users");
const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling");
const std::string replica_envs::READ_SIZE_THROTTLING("replica.read_throttling_by_size");
const std::string
replica_envs::SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash");
const std::string replica_envs::USER_SPECIFIED_COMPACTION("user_specified_compaction");
Expand Down
5 changes: 5 additions & 0 deletions src/meta/app_env_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <dsn/utility/string_conv.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replica_envs.h>
#include <dsn/utils/token_bucket_throttling_controller.h>
#include "app_env_validator.h"

namespace dsn {
Expand Down Expand Up @@ -185,6 +186,10 @@ void app_env_validator::register_all_validators()
{replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
{replica_envs::READ_QPS_THROTTLING,
std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::READ_SIZE_THROTTLING,
std::bind(&utils::token_bucket_throttling_controller::validate,
std::placeholders::_1,
std::placeholders::_2)},
{replica_envs::SPLIT_VALIDATE_PARTITION_HASH,
std::bind(&check_split_validation, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::USER_SPECIFIED_COMPACTION, nullptr},
Expand Down
4 changes: 3 additions & 1 deletion src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/dist/replication/replica_base.h>
#include <dsn/dist/replication/replication_app_base.h>

#include "common/replication_common.h"
#include "mutation.h"
Expand All @@ -62,7 +63,6 @@ class access_controller;
} // namespace security
namespace replication {

class replication_app_base;
class replica_stub;
class replica_duplicator_manager;
class replica_backup_manager;
Expand Down Expand Up @@ -467,6 +467,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_disk_migrator;
friend class replica_disk_test;
friend class replica_disk_migrate_test;
friend perf_counter_wrapper *
replication_app_base::get_counter_recent_read_throttling_reject_count();

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down
12 changes: 12 additions & 0 deletions src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,5 +603,17 @@ ::dsn::error_code replication_app_base::update_init_info_ballot_and_decree(repli
_info.init_offset_in_private_log,
r->last_durable_decree());
}

const app_info *replication_app_base::get_app_info() const { return _replica->get_app_info(); }

perf_counter_wrapper *replication_app_base::get_counter_recent_read_throttling_reject_count()
{
dassert(_replica, "_replica is null");
dassert(&_replica->_counter_recent_read_throttling_reject_count,
"_counter_recent_read_throttling_reject_count is null");

return &_replica->_counter_recent_read_throttling_reject_count;
}

} // namespace replication
} // namespace dsn
169 changes: 169 additions & 0 deletions src/utils/test/token_bucket_throttling_controller_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/utils/token_bucket_throttling_controller.h>
#include <dsn/utility/TokenBucket.h>

#include <gtest/gtest.h>

namespace dsn {
namespace utils {

class token_bucket_throttling_controller_test : public ::testing::Test
{
public:
void test_parse_env_basic_token_bucket_throttling()
{
token_bucket_throttling_controller cntl;
std::string parse_err;
bool env_changed = false;
std::string old_value;

int partition_count = 4;
std::string old_env = "";
std::string env = "20000*delay*100";
// token_bucket_throttling_controller doesn't support delay only
ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(cntl._env_value, env);
ASSERT_EQ(cntl._partition_count, 4);
ASSERT_EQ(cntl._burstsize, 0);
ASSERT_EQ(cntl._rate, 0);
ASSERT_EQ(cntl._enabled, false);
ASSERT_EQ(env_changed, true);
ASSERT_EQ(old_value, old_env);
ASSERT_EQ(parse_err, "");

old_env = env;
env = "200K";
ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(cntl._enabled, true);
ASSERT_EQ(cntl._env_value, env);
ASSERT_EQ(cntl._partition_count, 4);
ASSERT_EQ(cntl._rate, 200000 / partition_count);
ASSERT_EQ(cntl._burstsize, 200000 / partition_count);
ASSERT_EQ(env_changed, true);
ASSERT_EQ(old_value, old_env);
ASSERT_EQ(parse_err, "");

old_env = env;
env = "20000*delay*100,20000*reject*100";
ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(cntl._enabled, true);
ASSERT_EQ(cntl._env_value, env);
ASSERT_EQ(cntl._partition_count, 4);
ASSERT_EQ(cntl._rate, 20000 / partition_count);
ASSERT_EQ(cntl._burstsize, 20000 / partition_count);
ASSERT_EQ(env_changed, true);
ASSERT_EQ(old_value, old_env);
ASSERT_EQ(parse_err, "");

old_env = env;
env = "20000*reject*100";
ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(cntl._enabled, true);
ASSERT_EQ(cntl._env_value, env);
ASSERT_EQ(cntl._partition_count, 4);
ASSERT_EQ(cntl._rate, 20000 / partition_count);
ASSERT_EQ(cntl._burstsize, 20000 / partition_count);
ASSERT_EQ(env_changed, true);
ASSERT_EQ(old_value, old_env);
ASSERT_EQ(parse_err, "");

// invalid argument
old_env = env;
env = "*deldday*100";
ASSERT_FALSE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(env_changed, false);
ASSERT_EQ(parse_err, "wrong format, you can set like 20000 or 20K");
ASSERT_EQ(cntl._enabled, true); // ensure invalid env won't stop throttling
ASSERT_EQ(old_value, old_env);

ASSERT_FALSE(cntl.parse_from_env("", 4, parse_err, env_changed, old_value));
ASSERT_EQ(env_changed, false);
ASSERT_NE(parse_err, "");
ASSERT_EQ(parse_err, "wrong format, you can set like 20000 or 20K");
ASSERT_EQ(cntl._enabled, true);
ASSERT_EQ(old_value, old_env);
}

void throttle_test()
{
auto cntl = std::make_unique<token_bucket_throttling_controller>();
std::string parse_err;
bool env_changed = false;
std::string old_value;
const int partition_count = 4;

int throttle_limit = 200000;
cntl->parse_from_env(
std::to_string(throttle_limit), partition_count, parse_err, env_changed, old_value);

auto token_bucket = std::make_unique<DynamicTokenBucket>();
int fail_count = 0;
for (int i = 0; i < 100000; i++) {
token_bucket->consumeWithBorrowAndWait(
1, throttle_limit / partition_count * 0.8, throttle_limit / partition_count * 1.0);
if (!cntl->control(1)) {
fail_count++;
}
}
ASSERT_EQ(fail_count, 0);

sleep(1);

fail_count = 0;
for (int i = 0; i < 100000; i++) {
token_bucket->consumeWithBorrowAndWait(
1, throttle_limit / partition_count * 1.2, throttle_limit / partition_count * 1.5);
if (!cntl->control(1)) {
fail_count++;
}
}
ASSERT_GT(fail_count, 10000);

sleep(1);

fail_count = 0;
int fail_count1 = 0;
for (int i = 0; i < 200000; i++) {
if (i < 100000) {
token_bucket->consumeWithBorrowAndWait(1,
throttle_limit / partition_count * 1.2,
throttle_limit / partition_count * 1.5);
fail_count1 = fail_count;
} else {
token_bucket->consumeWithBorrowAndWait(1,
throttle_limit / partition_count * 0.2,
throttle_limit / partition_count * 0.3);
}
if (!cntl->control(1)) {
fail_count++;
}
}
ASSERT_GT(fail_count1, 10000);
ASSERT_LE(fail_count, fail_count1 * 1.2);
}
};

TEST_F(token_bucket_throttling_controller_test, test_parse_env_basic_token_bucket_throttling)
{
test_parse_env_basic_token_bucket_throttling();
}

TEST_F(token_bucket_throttling_controller_test, throttle_test) { throttle_test(); }
} // namespace utils
} // namespace dsn
Loading