Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add a token_bucket_throttling_controller #941

Merged
merged 15 commits into from
Oct 27, 2021
99 changes: 99 additions & 0 deletions include/dsn/utils/token_bucket_throttling_controller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <chrono>
#include <memory>

namespace folly {
template <typename Clock>
class BasicDynamicTokenBucket;

} // namespace folly

namespace dsn {
namespace utils {

using DynamicTokenBucket = folly::BasicDynamicTokenBucket<std::chrono::steady_clock>;

// token_bucket_throttling_controller ignores `delay` parameter
class token_bucket_throttling_controller
Smityz marked this conversation as resolved.
Show resolved Hide resolved
{
private:
friend class token_bucket_throttling_controller_test;

std::unique_ptr<DynamicTokenBucket> _token_bucket;

bool _enabled;
std::string _env_value;
int32_t _partition_count = 0;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
double _rate;
double _burstsize;

public:
token_bucket_throttling_controller();

// return ture means you can get token
// return false means the bucket is already empty, but the token is borrowed from future.
// non-blocking
bool get_token(int32_t request_units);

// reset to no throttling.
void reset(bool &changed, std::string &old_env_value);

// return the current env value.
const std::string &env_value() const;

// Configures throttling strategy dynamically from app-envs.
//
// Support two style format:
// 1. style: "20000*delay*100,20000*reject*100"
// example: 20000*delay*100,20000*reject*100
// result: reject 20000 request_units, but never delay
// example: 20000*delay*100
// result: never result or delay
Smityz marked this conversation as resolved.
Show resolved Hide resolved
// example: 20000*reject*100
// result: reject 20000 request_units
// 2. style: 20/"20K"/"20M"
// example: 20K
// result reject 20000 request_units
//
// return true if parse succeed.
// return false if parse failed for the reason of invalid env_value.
// if return false, the original value will not be changed.
// 'parse_error' is set when return false.
// 'changed' is set when return true.
// 'old_env_value' is set when 'changed' is set to true.
bool parse_from_env(const std::string &env_value,
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
int partition_count,
Smityz marked this conversation as resolved.
Show resolved Hide resolved
std::string &parse_error,
bool &changed,
std::string &old_env_value);

static bool string_to_value(std::string str, int64_t &value);

// wrapper of transform_env_string, check if the env string is validated.
static bool validate(const std::string &env, std::string &hint_message);

static bool transform_env_string(const std::string &env,
int64_t &reject_size_value,
bool &enabled,
std::string &hint_message);
};

} // namespace utils
} // namespace dsn
181 changes: 181 additions & 0 deletions src/utils/test/token_bucket_throttling_controller_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/utils/token_bucket_throttling_controller.h>
#include <dsn/utility/TokenBucket.h>

#include <gtest/gtest.h>

namespace dsn {
namespace utils {

class token_bucket_throttling_controller_test : public ::testing::Test
{
public:
void test_parse_env_basic_token_bucket_throttling()
{
token_bucket_throttling_controller cntl;
std::string parse_err;
bool env_changed = false;
std::string old_value;

int partition_count = 4;
std::string old_env = "";
std::string env = "20000*delay*100";
// token_bucket_throttling_controller doesn't support delay only
ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(cntl._env_value, env);
ASSERT_EQ(cntl._partition_count, 4);
ASSERT_EQ(cntl._burstsize, 0);
ASSERT_EQ(cntl._rate, 0);
ASSERT_EQ(cntl._enabled, false);
ASSERT_EQ(env_changed, true);
ASSERT_EQ(old_value, old_env);
ASSERT_EQ(parse_err, "");

old_env = env;
env = "200K";
ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(cntl._enabled, true);
ASSERT_EQ(cntl._env_value, env);
ASSERT_EQ(cntl._partition_count, 4);
ASSERT_EQ(cntl._rate, 200000 / partition_count);
ASSERT_EQ(cntl._burstsize, 200000 / partition_count);
ASSERT_EQ(env_changed, true);
ASSERT_EQ(old_value, old_env);
ASSERT_EQ(parse_err, "");

old_env = env;
env = "20000*delay*100,20000*reject*100";
ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(cntl._enabled, true);
ASSERT_EQ(cntl._env_value, env);
ASSERT_EQ(cntl._partition_count, 4);
ASSERT_EQ(cntl._rate, 20000 / partition_count);
ASSERT_EQ(cntl._burstsize, 20000 / partition_count);
ASSERT_EQ(env_changed, true);
ASSERT_EQ(old_value, old_env);
ASSERT_EQ(parse_err, "");

old_env = env;
env = "20K*delay*100,20K*reject*100";
ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(cntl._enabled, true);
ASSERT_EQ(cntl._env_value, env);
ASSERT_EQ(cntl._partition_count, 4);
ASSERT_EQ(cntl._rate, 20000 / partition_count);
ASSERT_EQ(cntl._burstsize, 20000 / partition_count);
ASSERT_EQ(env_changed, true);
ASSERT_EQ(old_value, old_env);
ASSERT_EQ(parse_err, "");

old_env = env;
env = "20000*reject*100";
ASSERT_TRUE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(cntl._enabled, true);
ASSERT_EQ(cntl._env_value, env);
ASSERT_EQ(cntl._partition_count, 4);
ASSERT_EQ(cntl._rate, 20000 / partition_count);
ASSERT_EQ(cntl._burstsize, 20000 / partition_count);
ASSERT_EQ(env_changed, true);
ASSERT_EQ(old_value, old_env);
ASSERT_EQ(parse_err, "");

// invalid argument
Smityz marked this conversation as resolved.
Show resolved Hide resolved
old_env = env;
env = "*deldday*100";
ASSERT_FALSE(cntl.parse_from_env(env, partition_count, parse_err, env_changed, old_value));
ASSERT_EQ(env_changed, false);
ASSERT_EQ(parse_err, "wrong format, you can set like 20000 or 20K");
ASSERT_EQ(cntl._enabled, true); // ensure invalid env won't stop throttling
ASSERT_EQ(old_value, old_env);

ASSERT_FALSE(cntl.parse_from_env("", 4, parse_err, env_changed, old_value));
ASSERT_EQ(env_changed, false);
ASSERT_NE(parse_err, "");
ASSERT_EQ(parse_err, "wrong format, you can set like 20000 or 20K");
ASSERT_EQ(cntl._enabled, true);
ASSERT_EQ(old_value, old_env);
}

void throttle_test()
{
auto cntl = std::make_unique<token_bucket_throttling_controller>();
std::string parse_err;
bool env_changed = false;
std::string old_value;
const int partition_count = 4;

int throttle_limit = 200000;
cntl->parse_from_env(
std::to_string(throttle_limit), partition_count, parse_err, env_changed, old_value);

auto token_bucket = std::make_unique<DynamicTokenBucket>();
int fail_count = 0;
for (int i = 0; i < 100000; i++) {
token_bucket->consumeWithBorrowAndWait(
1, throttle_limit / partition_count * 0.8, throttle_limit / partition_count * 1.0);
if (!cntl->get_token(1)) {
fail_count++;
}
}
ASSERT_EQ(fail_count, 0);

sleep(1);

fail_count = 0;
for (int i = 0; i < 100000; i++) {
token_bucket->consumeWithBorrowAndWait(
1, throttle_limit / partition_count * 1.2, throttle_limit / partition_count * 1.5);
if (!cntl->get_token(1)) {
fail_count++;
}
}
ASSERT_GT(fail_count, 10000);

sleep(1);

fail_count = 0;
int fail_count1 = 0;
for (int i = 0; i < 200000; i++) {
if (i < 100000) {
token_bucket->consumeWithBorrowAndWait(1,
throttle_limit / partition_count * 1.2,
throttle_limit / partition_count * 1.5);
fail_count1 = fail_count;
} else {
token_bucket->consumeWithBorrowAndWait(1,
throttle_limit / partition_count * 0.2,
throttle_limit / partition_count * 0.3);
}
if (!cntl->get_token(1)) {
fail_count++;
}
}
ASSERT_GT(fail_count1, 10000);
ASSERT_LE(fail_count, fail_count1 * 1.2);
}
};

TEST_F(token_bucket_throttling_controller_test, test_parse_env_basic_token_bucket_throttling)
{
test_parse_env_basic_token_bucket_throttling();
}

TEST_F(token_bucket_throttling_controller_test, throttle_test) { throttle_test(); }
} // namespace utils
} // namespace dsn
Loading