From 275c3ced0b93af0ffeea0735d1a7f8007c999e7e Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Fri, 24 Apr 2020 13:31:44 +0800 Subject: [PATCH] feat(util): add token bucket for rate limit --- include/dsn/utility/TokenBucket.h | 507 +++++++++++++++++++++++++++++ src/core/tests/TokenBucketTest.cpp | 131 ++++++++ src/core/tests/TokenBucketTest.h | 32 ++ 3 files changed, 670 insertions(+) create mode 100644 include/dsn/utility/TokenBucket.h create mode 100644 src/core/tests/TokenBucketTest.cpp create mode 100644 src/core/tests/TokenBucketTest.h diff --git a/include/dsn/utility/TokenBucket.h b/include/dsn/utility/TokenBucket.h new file mode 100644 index 0000000000..5dc4e82efe --- /dev/null +++ b/include/dsn/utility/TokenBucket.h @@ -0,0 +1,507 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace folly { + +/** + * Thread-safe (atomic) token bucket implementation. + * + * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream + * of events with an average rate and some amount of burstiness. The canonical + * example is a packet switched network: the network can accept some number of + * bytes per second and the bytes come in finite packets (bursts). A token + * bucket stores up to a fixed number of tokens (the burst size). Some number + * of tokens are removed when an event occurs. The tokens are replenished at a + * fixed rate. Failure to allocate tokens implies resource is unavailable and + * caller needs to implement its own retry mechanism. For simple cases where + * caller is okay with a FIFO starvation-free scheduling behavior, there are + * also APIs to 'borrow' from the future effectively assigning a start time to + * the caller when it should proceed with using the resource. It is also + * possible to 'return' previously allocated tokens to make them available to + * other users. Returns in excess of burstSize are considered expired and + * will not be available to later callers. + * + * This implementation records the last time it was updated. This allows the + * token bucket to add tokens "just in time" when tokens are requested. + * + * The "dynamic" base variant allows the token generation rate and maximum + * burst size to change with every token consumption. + * + * @tparam Clock Clock type, must be steady i.e. monotonic. + */ +template +class BasicDynamicTokenBucket +{ + static_assert(Clock::is_steady, "clock must be steady"); + +public: + /** + * Constructor. + * + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * buckets are "full" after construction. + */ + explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept : zeroTime_(zeroTime) {} + + /** + * Copy constructor. + * + * Thread-safe. (Copy constructors of derived classes may not be thread-safe + * however.) + */ + BasicDynamicTokenBucket(const BasicDynamicTokenBucket &other) noexcept + : zeroTime_(other.zeroTime_.load()) + { + } + + /** + * Copy-assignment operator. + * + * Warning: not thread safe for the object being assigned to (including + * self-assignment). Thread-safe for the other object. + */ + BasicDynamicTokenBucket &operator=(const BasicDynamicTokenBucket &other) noexcept + { + zeroTime_ = other.zeroTime_.load(); + return *this; + } + + /** + * Re-initialize token bucket. + * + * Thread-safe. + * + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * bucket is reset to "full". + */ + void reset(double zeroTime = 0) noexcept { zeroTime_ = zeroTime; } + + /** + * Returns the current time in seconds since Epoch. + */ + static double defaultClockNow() noexcept + { + auto const now = Clock::now().time_since_epoch(); + return std::chrono::duration(now).count(); + } + + /** + * Attempts to consume some number of tokens. Tokens are first added to the + * bucket based on the time elapsed since the last attempt to consume tokens. + * Note: Attempts to consume more tokens than the burst size will always + * fail. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param rate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return True if the rate limit check passed, false otherwise. + */ + bool consume(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + assert(rate > 0); + assert(burstSize > 0); + + if (nowInSeconds <= zeroTime_.load()) { + return 0; + } + + return consumeImpl(rate, burstSize, nowInSeconds, [toConsume](double &tokens) { + if (tokens < toConsume) { + return false; + } + tokens -= toConsume; + return true; + }); + } + + /** + * Similar to consume, but always consumes some number of tokens. If the + * bucket contains enough tokens - consumes toConsume tokens. Otherwise the + * bucket is drained. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param rate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return number of tokens that were consumed. + */ + double consumeOrDrain(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + assert(rate > 0); + assert(burstSize > 0); + + if (nowInSeconds <= zeroTime_.load()) { + return 0; + } + + double consumed; + consumeImpl(rate, burstSize, nowInSeconds, [&consumed, toConsume](double &tokens) { + if (tokens < toConsume) { + consumed = tokens; + tokens = 0.0; + } else { + consumed = toConsume; + tokens -= toConsume; + } + return true; + }); + return consumed; + } + + /** + * Return extra tokens back to the bucket. This will move the zeroTime_ + * value back based on the rate. + * + * Thread-safe. + */ + void returnTokens(double tokensToReturn, double rate) + { + assert(rate > 0); + assert(tokensToReturn > 0); + + returnTokensImpl(tokensToReturn, rate); + } + + /** + * Like consumeOrDrain but the call will always satisfy the asked for count. + * It does so by borrowing tokens from the future (zeroTime_ will move + * forward) if the currently available count isn't sufficient. + * + * Returns a folly::Optional. The optional wont be set if the request + * cannot be satisfied: only case is when it is larger than burstSize. The + * value of the optional is a double indicating the time in seconds that the + * caller needs to wait at which the reservation becomes valid. The caller + * could simply sleep for the returned duration to smooth out the allocation + * to match the rate limiter or do some other computation in the meantime. In + * any case, any regular consume or consumeOrDrain calls will fail to allocate + * any tokens until the future time is reached. + * + * Note: It is assumed the caller will not ask for a very large count nor use + * it immediately (if not waiting inline) as that would break the burst + * prevention the limiter is meant to be used for. + * + * Thread-safe. + */ + boost::optional consumeWithBorrowNonBlocking(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + assert(rate > 0); + assert(burstSize > 0); + + if (burstSize < toConsume) { + return boost::none; + } + + while (toConsume > 0) { + double consumed = consumeOrDrain(toConsume, rate, burstSize, nowInSeconds); + if (consumed > 0) { + toConsume -= consumed; + } else { + double zeroTimeNew = returnTokensImpl(-toConsume, rate); + double napTime = std::max(0.0, zeroTimeNew - nowInSeconds); + return boost::optional(napTime); + } + } + return boost::optional(0); + } + + /** + * Convenience wrapper around non-blocking borrow to sleep inline until + * reservation is valid. + */ + bool consumeWithBorrowAndWait(double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) + { + auto res = consumeWithBorrowNonBlocking(toConsume, rate, burstSize, nowInSeconds); + if (res.get_value_or(0) > 0) { + int64_t napUSec = res.get() * 1000000; + std::this_thread::sleep_for(std::chrono::microseconds(napUSec)); + } + return res.is_initialized(); + } + + /** + * Returns the number of tokens currently available. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double available(double rate, double burstSize, double nowInSeconds = defaultClockNow()) const + noexcept + { + assert(rate > 0); + assert(burstSize > 0); + + double zt = this->zeroTime_.load(); + if (nowInSeconds <= zt) { + return 0; + } + return std::min((nowInSeconds - zt) * rate, burstSize); + } + +private: + template + bool consumeImpl(double rate, double burstSize, double nowInSeconds, const TCallback &callback) + { + auto zeroTimeOld = zeroTime_.load(); + double zeroTimeNew; + do { + auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize); + if (!callback(tokens)) { + return false; + } + zeroTimeNew = nowInSeconds - tokens / rate; + } while (dsn_unlikely(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); + + return true; + } + + /** + * Adjust zeroTime based on rate and tokenCount and return the new value of + * zeroTime_. Note: Token count can be negative to move the zeroTime_ value + * into the future. + */ + double returnTokensImpl(double tokenCount, double rate) + { + auto zeroTimeOld = zeroTime_.load(); + double zeroTimeNew; + do { + zeroTimeNew = zeroTimeOld - tokenCount / rate; + } while (dsn_unlikely(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); + return zeroTimeNew; + } + + std::atomic zeroTime_; +}; + +/** + * Specialization of BasicDynamicTokenBucket with a fixed token + * generation rate and a fixed maximum burst size. + */ +template +class BasicTokenBucket +{ + static_assert(Clock::is_steady, "clock must be steady"); + +private: + using Impl = BasicDynamicTokenBucket; + +public: + /** + * Construct a token bucket with a specific maximum rate and burst size. + * + * @param genRate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * bucket is "full" after construction. + */ + BasicTokenBucket(double genRate, double burstSize, double zeroTime = 0) noexcept + : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) + { + assert(rate_ > 0); + assert(burstSize_ > 0); + } + + /** + * Copy constructor. + * + * Warning: not thread safe! + */ + BasicTokenBucket(const BasicTokenBucket &other) noexcept = default; + + /** + * Copy-assignment operator. + * + * Warning: not thread safe! + */ + BasicTokenBucket &operator=(const BasicTokenBucket &other) noexcept = default; + + /** + * Returns the current time in seconds since Epoch. + */ + static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) + { + return Impl::defaultClockNow(); + } + + /** + * Change rate and burst size. + * + * Warning: not thread safe! + * + * @param genRate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + */ + void reset(double genRate, double burstSize, double nowInSeconds = defaultClockNow()) noexcept + { + assert(genRate > 0); + assert(burstSize > 0); + const double availTokens = available(nowInSeconds); + rate_ = genRate; + burstSize_ = burstSize; + setCapacity(availTokens, nowInSeconds); + } + + /** + * Change number of tokens in bucket. + * + * Warning: not thread safe! + * + * @param tokens Desired number of tokens in bucket after the call. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + */ + void setCapacity(double tokens, double nowInSeconds) noexcept + { + tokenBucket_.reset(nowInSeconds - tokens / rate_); + } + + /** + * Attempts to consume some number of tokens. Tokens are first added to the + * bucket based on the time elapsed since the last attempt to consume tokens. + * Note: Attempts to consume more tokens than the burst size will always + * fail. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return True if the rate limit check passed, false otherwise. + */ + bool consume(double toConsume, double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Similar to consume, but always consumes some number of tokens. If the + * bucket contains enough tokens - consumes toConsume tokens. Otherwise the + * bucket is drained. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return number of tokens that were consumed. + */ + double consumeOrDrain(double toConsume, double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consumeOrDrain(toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Returns extra token back to the bucket. + */ + void returnTokens(double tokensToReturn) + { + return tokenBucket_.returnTokens(tokensToReturn, rate_); + } + + /** + * Reserve tokens and return time to wait for in order for the reservation to + * be compatible with the bucket configuration. + */ + boost::optional consumeWithBorrowNonBlocking(double toConsume, + double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consumeWithBorrowNonBlocking( + toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Reserve tokens. Blocks if need be until reservation is satisfied. + */ + bool consumeWithBorrowAndWait(double toConsume, double nowInSeconds = defaultClockNow()) + { + return tokenBucket_.consumeWithBorrowAndWait(toConsume, rate_, burstSize_, nowInSeconds); + } + + /** + * Returns the number of tokens currently available. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double available(double nowInSeconds = defaultClockNow()) const + { + return tokenBucket_.available(rate_, burstSize_, nowInSeconds); + } + + /** + * Returns the number of tokens generated per second. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double rate() const noexcept { return rate_; } + + /** + * Returns the maximum burst size. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double burst() const noexcept { return burstSize_; } + +private: + Impl tokenBucket_; + double rate_; + double burstSize_; +}; + +using TokenBucket = BasicTokenBucket<>; +using DynamicTokenBucket = BasicDynamicTokenBucket<>; + +} // namespace folly diff --git a/src/core/tests/TokenBucketTest.cpp b/src/core/tests/TokenBucketTest.cpp new file mode 100644 index 0000000000..81434b9f3d --- /dev/null +++ b/src/core/tests/TokenBucketTest.cpp @@ -0,0 +1,131 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "TokenBucketTest.h" +#include +#include + +using namespace folly; + +TEST(TokenBucket, ReverseTime) +{ + const double rate = 1000; + TokenBucket tokenBucket(rate, rate * 0.01 + 1e-6, 0); + size_t count = 0; + while (tokenBucket.consume(1, 0.1)) { + count += 1; + } + EXPECT_EQ(10, count); + // Going backwards in time has no affect on the toke count (this protects + // against different threads providing out of order timestamps). + double tokensBefore = tokenBucket.available(); + EXPECT_FALSE(tokenBucket.consume(1, 0.09999999)); + EXPECT_EQ(tokensBefore, tokenBucket.available()); +} + +TEST_P(TokenBucketTest, sanity) +{ + std::pair params = GetParam(); + double rate = params.first; + double consumeSize = params.second; + + const double tenMillisecondBurst = rate * 0.010; + // Select a burst size of 10 milliseconds at the max rate or the consume size + // if 10 ms at rate is too small. + const double burstSize = std::max(consumeSize, tenMillisecondBurst); + TokenBucket tokenBucket(rate, burstSize, 0); + double tokenCounter = 0; + double currentTime = 0; + // Simulate time advancing 10 seconds + for (; currentTime <= 10.0; currentTime += 0.001) { + EXPECT_FALSE(tokenBucket.consume(burstSize + 1, currentTime)); + while (tokenBucket.consume(consumeSize, currentTime)) { + tokenCounter += consumeSize; + } + // Tokens consumed should exceed some lower bound based on rate. + // Note: The token bucket implementation is not precise, so the lower bound + // is somewhat fudged. The upper bound is accurate however. + EXPECT_LE(rate * currentTime * 0.9 - 1, tokenCounter); + // Tokens consumed should not exceed some upper bound based on rate. + EXPECT_GE(rate * currentTime + 1e-6, tokenCounter); + } +} + +static std::vector> rateToConsumeSize = { + {100, 1}, {1000, 1}, {10000, 1}, {10000, 5}, +}; + +INSTANTIATE_TEST_CASE_P(TokenBucket, TokenBucketTest, ::testing::ValuesIn(rateToConsumeSize)); + +TEST(TokenBucket, drainOnFail) +{ + DynamicTokenBucket tokenBucket; + + // Almost empty the bucket + EXPECT_TRUE(tokenBucket.consume(9, 10, 10, 1)); + + // Request more tokens than available + EXPECT_FALSE(tokenBucket.consume(5, 10, 10, 1)); + EXPECT_DOUBLE_EQ(1.0, tokenBucket.available(10, 10, 1)); + + // Again request more tokens than available, but ask to drain + EXPECT_DOUBLE_EQ(1.0, tokenBucket.consumeOrDrain(5, 10, 10, 1)); + EXPECT_DOUBLE_EQ(0.0, tokenBucket.consumeOrDrain(1, 10, 10, 1)); +} + +TEST(TokenBucket, returnTokensTest) +{ + DynamicTokenBucket tokenBucket; + + // Empty the bucket. + EXPECT_TRUE(tokenBucket.consume(10, 10, 10, 5)); + // consume should fail now. + EXPECT_FALSE(tokenBucket.consume(1, 10, 10, 5)); + EXPECT_DOUBLE_EQ(0.0, tokenBucket.consumeOrDrain(1, 10, 10, 5)); + + // Return tokens. Return 40 'excess' tokens but they wont be available to + // later callers. + tokenBucket.returnTokens(50, 10); + // Should be able to allocate 10 tokens again but the extra 40 returned in + // previous call are gone. + EXPECT_TRUE(tokenBucket.consume(10, 10, 10, 5)); + EXPECT_FALSE(tokenBucket.consume(1, 10, 10, 5)); +} + +TEST(TokenBucket, consumeOrBorrowTest) +{ + DynamicTokenBucket tokenBucket; + + // Empty the bucket. + EXPECT_TRUE(tokenBucket.consume(10, 10, 10, 1)); + // consume should fail now. + EXPECT_FALSE(tokenBucket.consume(1, 10, 10, 1)); + // Now borrow from future allocations. Each call is asking for 1s worth of + // allocations so it should return (i+1)*1s in the ith iteration as the time + // caller needs to wait. + for (int i = 0; i < 10; ++i) { + auto waitTime = tokenBucket.consumeWithBorrowNonBlocking(10, 10, 10, 1); + EXPECT_TRUE(waitTime.is_initialized()); + EXPECT_DOUBLE_EQ((i + 1) * 1.0, *waitTime); + } + + // No allocation will succeed until nowInSeconds goes higher than 11s. + EXPECT_FALSE(tokenBucket.consume(1, 10, 10, 11)); +} diff --git a/src/core/tests/TokenBucketTest.h b/src/core/tests/TokenBucketTest.h new file mode 100644 index 0000000000..2a500a3664 --- /dev/null +++ b/src/core/tests/TokenBucketTest.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include + +namespace folly { + +struct TokenBucketTest : public ::testing::TestWithParam> +{ +}; + +} // namespace folly