Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared, global RLQS client & buckets cache #34009

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions source/extensions/filters/http/rate_limit_quota/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ envoy_cc_library(
"//source/extensions/matching/input_matchers/cel_matcher:config",
"@envoy_api//envoy/extensions/filters/http/rate_limit_quota/v3:pkg_cc_proto",
"@envoy_api//envoy/service/rate_limit_quota/v3:pkg_cc_proto",
"@envoy_api//envoy/type/v3:pkg_cc_proto",
],
)

Expand All @@ -38,41 +39,48 @@ envoy_cc_extension(
srcs = ["config.cc"],
hdrs = ["config.h"],
deps = [
":client_interface",
":global_client_lib",
":rate_limit_quota",
"//envoy/grpc:async_client_manager_interface",
"//envoy/registry",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/extensions/filters/http/rate_limit_quota/v3:pkg_cc_proto",
"@envoy_api//envoy/type/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "client_interface",
hdrs = ["client.h"],
name = "client_lib",
srcs = ["client_impl.cc"],
hdrs = ["client_impl.h"],
deps = [
":global_client_lib",
":quota_bucket_cache",
"//envoy/grpc:async_client_interface",
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
"@envoy_api//envoy/extensions/filters/http/rate_limit_quota/v3:pkg_cc_proto",
"@envoy_api//envoy/service/rate_limit_quota/v3:pkg_cc_proto",
"//source/common/common:statusor_lib",
"//source/common/grpc:typed_async_client_lib",
"//source/common/tracing:http_tracer_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/type/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "client_lib",
srcs = ["client_impl.cc"],
hdrs = ["client_impl.h"],
name = "global_client_lib",
srcs = ["global_client_impl.cc"],
hdrs = ["global_client_impl.h"],
deps = [
":client_interface",
":quota_bucket_cache",
"//envoy/grpc:async_client_interface",
"//envoy/grpc:status",
"//source/common/common:statusor_lib",
"//source/common/grpc:typed_async_client_lib",
"//source/common/tracing:http_tracer_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/service/rate_limit_quota/v3:pkg_cc_proto",
"@envoy_api//envoy/type/v3:pkg_cc_proto",
],
)

Expand All @@ -98,8 +106,6 @@ envoy_cc_library(
name = "quota_bucket_cache",
hdrs = ["quota_bucket_cache.h"],
deps = [
":client_interface",
"//envoy/common:time_interface",
"//source/common/common:token_bucket_impl_lib",
"//source/common/http:headers_lib",
"//source/common/http:utility_lib",
Expand Down
51 changes: 0 additions & 51 deletions source/extensions/filters/http/rate_limit_quota/client.h

This file was deleted.

217 changes: 26 additions & 191 deletions source/extensions/filters/http/rate_limit_quota/client_impl.cc
Original file line number Diff line number Diff line change
@@ -1,204 +1,39 @@
#include "source/extensions/filters/http/rate_limit_quota/client_impl.h"

#include "source/common/tracing/http_tracer_impl.h"
#include <cstddef>
#include <memory>

#include "envoy/type/v3/ratelimit_strategy.pb.h"
#include "envoy/type/v3/token_bucket.pb.h"

#include "source/common/common/logger.h"
#include "source/extensions/filters/http/rate_limit_quota/global_client_impl.h"
#include "source/extensions/filters/http/rate_limit_quota/quota_bucket_cache.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace RateLimitQuota {

Grpc::RawAsyncClientSharedPtr
getOrThrow(absl::StatusOr<Grpc::RawAsyncClientSharedPtr> client_or_error) {
THROW_IF_NOT_OK_REF(client_or_error.status());
return client_or_error.value();
}

RateLimitClientImpl::RateLimitClientImpl(
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
Server::Configuration::FactoryContext& context, absl::string_view domain_name,
RateLimitQuotaCallbacks* callbacks, BucketsCache& quota_buckets)
: domain_name_(domain_name),
aync_client_(getOrThrow(
context.serverFactoryContext()
.clusterManager()
.grpcAsyncClientManager()
.getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, context.scope(), true))),
rlqs_callback_(callbacks), quota_buckets_(quota_buckets),
time_source_(context.serverFactoryContext().mainThreadDispatcher().timeSource()) {}

RateLimitQuotaUsageReports RateLimitClientImpl::buildReport(absl::optional<size_t> bucket_id) {
RateLimitQuotaUsageReports report;
// Build the report from quota bucket cache.
for (const auto& [id, bucket] : quota_buckets_) {
auto* usage = report.add_bucket_quota_usages();
*usage->mutable_bucket_id() = bucket->bucket_id;
usage->set_num_requests_allowed(bucket->quota_usage.num_requests_allowed);
usage->set_num_requests_denied(bucket->quota_usage.num_requests_denied);

auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(
time_source_.monotonicTime().time_since_epoch());
// For the newly created bucket (i.e., `bucket_id` input is not null), its time
// elapsed since last report is 0.
// This case happens when we send the report to RLQS server immediately.
if (bucket_id.has_value() && bucket_id.value() == id) {
*usage->mutable_time_elapsed() = Protobuf::util::TimeUtil::NanosecondsToDuration(0);
} else {
*usage->mutable_time_elapsed() = Protobuf::util::TimeUtil::NanosecondsToDuration(
(now - bucket->quota_usage.last_report).count());
}

// Update the last_report time point.
bucket->quota_usage.last_report = now;
// Reset the number of request allowed/denied. The RLQS server expects the client to report
// those two usage numbers only for last report period.
bucket->quota_usage.num_requests_allowed = 0;
bucket->quota_usage.num_requests_denied = 0;
}

// Set the domain name.
report.set_domain(domain_name_);
ENVOY_LOG(debug, "The usage report that will be sent to RLQS server:\n{}", report.DebugString());
return report;
}

// This function covers both periodical report and immediate report case, with the difference that
// bucked id in periodical report case is empty.
void RateLimitClientImpl::sendUsageReport(absl::optional<size_t> bucket_id) {
if (stream_ == nullptr) {
ENVOY_LOG(debug, "The RLQS stream has been closed and must be restarted to send reports.");
if (absl::Status err = startStream(nullptr); !err.ok()) {
ENVOY_LOG(error, "Failed to start the stream to send reports.");
return;
}
}

// Build the report and then send the report to RLQS server.
// `end_stream` should always be set to false as we don't want to close the stream locally.
stream_->sendMessage(buildReport(bucket_id), /*end_stream=*/false);
using BucketAction = RateLimitQuotaResponse::BucketAction;

void LocalRateLimitClientImpl::createBucket(
const BucketId& bucket_id, size_t id, const BucketAction& default_bucket_action,
std::unique_ptr<envoy::type::v3::RateLimitStrategy> fallback_action,
std::chrono::milliseconds fallback_ttl, bool initial_request_allowed) {
std::shared_ptr<GlobalRateLimitClientImpl> global_client = getGlobalClient();
// Intentionally crash if the local client is initialized with a null global
// client or TLS slot due to a bug.
global_client->createBucket(bucket_id, id, default_bucket_action, std::move(fallback_action),
fallback_ttl, initial_request_allowed);
}

void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response) {
ENVOY_LOG(debug, "The response that is received from RLQS server:\n{}", response->DebugString());
for (const auto& action : response->bucket_action()) {
if (!action.has_bucket_id() || action.bucket_id().bucket().empty()) {
ENVOY_LOG(error,
"Received a response, but bucket_id is missing : ", response->ShortDebugString());
continue;
}

// Get the hash id value from BucketId in the response.
const size_t bucket_id = MessageUtil::hash(action.bucket_id());
ENVOY_LOG(debug,
"Received a response for bucket id proto :\n {}, and generated "
"the associated hashed bucket id: {}",
action.bucket_id().DebugString(), bucket_id);
if (quota_buckets_.find(bucket_id) == quota_buckets_.end()) {
// The response should be matched to the report we sent.
ENVOY_LOG(error, "The received response is not matched to any quota cache entry: ",
response->ShortDebugString());
} else {
switch (action.bucket_action_case()) {
case envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse_BucketAction::
kQuotaAssignmentAction: {
absl::optional<BucketAction> cached_action = quota_buckets_[bucket_id]->cached_action;
quota_buckets_[bucket_id]->current_assignment_time = time_source_.monotonicTime();

if (cached_action.has_value() &&
Protobuf::util::MessageDifferencer::Equals(*cached_action, action)) {
ENVOY_LOG(debug,
"Cached action matches the incoming response so only TTL is updated for bucket "
"id: {}",
bucket_id);
break;
}
quota_buckets_[bucket_id]->cached_action = action;
if (quota_buckets_[bucket_id]->cached_action->has_quota_assignment_action()) {
auto rate_limit_strategy = quota_buckets_[bucket_id]
->cached_action->quota_assignment_action()
.rate_limit_strategy();

if (rate_limit_strategy.has_token_bucket()) {
const auto& interval_proto = rate_limit_strategy.token_bucket().fill_interval();
// Convert absl::duration to int64_t seconds
int64_t fill_interval_sec =
absl::ToInt64Seconds(absl::Seconds(interval_proto.seconds()) +
absl::Nanoseconds(interval_proto.nanos()));
double fill_rate_per_sec =
static_cast<double>(rate_limit_strategy.token_bucket().tokens_per_fill().value()) /
fill_interval_sec;
uint32_t max_tokens = rate_limit_strategy.token_bucket().max_tokens();
ENVOY_LOG(trace,
"Created the token bucket limiter for hashed bucket "
"id: {}, with max_tokens: {}; "
"fill_interval_sec: {}; fill_rate_per_sec: {}.",
bucket_id, max_tokens, fill_interval_sec, fill_rate_per_sec);
quota_buckets_[bucket_id]->token_bucket_limiter =
std::make_unique<TokenBucketImpl>(max_tokens, time_source_, fill_rate_per_sec);
}
}
break;
}
case envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse_BucketAction::
kAbandonAction: {
quota_buckets_.erase(bucket_id);
ENVOY_LOG(debug, "Bucket id {} removed from the cache by abandon action.", bucket_id);
break;
}
default: {
ENVOY_LOG_EVERY_POW_2(error, "Unset bucket action type {}",
static_cast<int>(action.bucket_action_case()));
break;
}
}
}
ENVOY_LOG(debug, "Assignment cached for bucket id {}.", bucket_id);
}

// `rlqs_callback_` has been reset to nullptr for periodical report case.
// No need to invoke onQuotaResponse to continue the filter chain for this case as filter chain
// has not been paused.
if (rlqs_callback_ != nullptr) {
rlqs_callback_->onQuotaResponse(*response);
}
}

void RateLimitClientImpl::closeStream() {
// Close the stream if it is in open state.
if (stream_ != nullptr) {
ENVOY_LOG(debug, "Closing gRPC stream");
stream_->closeStream();
stream_->resetStream();
stream_ = nullptr;
}
}

void RateLimitClientImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
const std::string& message) {
ENVOY_LOG(debug, "gRPC stream closed remotely with status {}: {}", status, message);
stream_ = nullptr;
}

absl::Status RateLimitClientImpl::startStream(const StreamInfo::StreamInfo* stream_info) {
// Starts stream if it has not been opened yet.
if (stream_ == nullptr) {
ENVOY_LOG(debug, "Trying to start the new gRPC stream");
auto stream_options = Http::AsyncClient::RequestOptions();
if (stream_info) {
stream_options.setParentContext(Http::AsyncClient::ParentContext{stream_info});
}
stream_ = aync_client_.start(
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.rate_limit_quota.v3.RateLimitQuotaService.StreamRateLimitQuotas"),
*this, stream_options);
}

// If still null after attempting a start.
if (stream_ == nullptr) {
return absl::InternalError("Failed to start the stream");
}

ENVOY_LOG(debug, "gRPC stream has been started");
return absl::OkStatus();
std::shared_ptr<CachedBucket> LocalRateLimitClientImpl::getBucket(size_t id) {
std::shared_ptr<BucketsCache> buckets_cache = getBucketsCache();
// Intentionally crash if the client is initialized with a null global cache
// or TLS slot due to a bug.
auto bucket_it = buckets_cache->find(id);
return (bucket_it != buckets_cache->end()) ? bucket_it->second : nullptr;
}

} // namespace RateLimitQuota
Expand Down
Loading