Skip to content

Commit

Permalink
Merge b76e3d7 into 9d14e5e
Browse files Browse the repository at this point in the history
  • Loading branch information
mathetake authored Dec 18, 2024
2 parents 9d14e5e + b76e3d7 commit 5aa40f0
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 52 deletions.
19 changes: 18 additions & 1 deletion api/envoy/config/route/v3/route_components.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,7 @@ message VirtualCluster {

// Global rate limiting :ref:`architecture overview <arch_overview_global_rate_limit>`.
// Also applies to Local rate limiting :ref:`using descriptors <config_http_filters_local_rate_limit_descriptors>`.
// [#next-free-field: 6]
// [#next-free-field: 7]
message RateLimit {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.route.RateLimit";

Expand Down Expand Up @@ -2245,6 +2245,23 @@ message RateLimit {
// :ref:`VirtualHost.typed_per_filter_config<envoy_v3_api_field_config.route.v3.VirtualHost.typed_per_filter_config>` or
// :ref:`Route.typed_per_filter_config<envoy_v3_api_field_config.route.v3.Route.typed_per_filter_config>`, etc.
HitsAddend hits_addend = 5;

// If true, the rate limit request will be applied when the stream completes. The default value is false.
// This is useful when the rate limit budget needs to reflect the response context that is not available
// on the request path.
//
// For example, let's say the upstream service calculates the usage statistics and returns them in the response body
// and we want to utilize these numbers to apply the rate limit action for the subsequent requests.
// Combined with another filter that can set the desired addend based on the response (e.g. Lua filter),
// this can be used to subtract the usage statistics from the rate limit budget.
//
// A rate limit applied on the stream completion is "fire-and-forget" by nature, and rate limit is not enforced by this config.
// In other words, the current request won't be blocked when this is true, but the budget will be updated for the subsequent
// requests based on the action with this field set to true. Users should ensure that the rate limit is enforced by the actions
// applied on the request path, i.e. the ones with this field set to false.
//
// Currently, this is only supported by the HTTP global rate filter.
bool apply_on_stream_done = 6;
}

// .. attention::
Expand Down
5 changes: 5 additions & 0 deletions envoy/router/router_ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class RateLimitPolicyEntry {
*/
virtual const std::string& disableKey() const PURE;

/**
* @return true if this rate limit policy should be applied on stream done.
*/
virtual bool applyOnStreamDone() const PURE;

/**
* Potentially populate the descriptor array with new descriptors to query.
* @param descriptors supplies the descriptor array to optionally fill.
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/router_ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ RateLimitPolicyEntryImpl::RateLimitPolicyEntryImpl(
const envoy::config::route::v3::RateLimit& config,
Server::Configuration::CommonFactoryContext& context, absl::Status& creation_status)
: disable_key_(config.disable_key()),
stage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))) {
stage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))),
apply_on_stream_done_(config.apply_on_stream_done()) {
for (const auto& action : config.actions()) {
switch (action.action_specifier_case()) {
case envoy::config::route::v3::RateLimit::Action::ActionSpecifierCase::kSourceCluster:
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router_ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,14 @@ class RateLimitPolicyEntryImpl : public RateLimitPolicyEntry {
const std::string& local_service_cluster,
const Http::RequestHeaderMap&,
const StreamInfo::StreamInfo& info) const override;
bool applyOnStreamDone() const override { return apply_on_stream_done_; }

private:
const std::string disable_key_;
uint64_t stage_;
std::vector<RateLimit::DescriptorProducerPtr> actions_;
absl::optional<RateLimitOverrideActionPtr> limit_override_ = absl::nullopt;
const bool apply_on_stream_done_ = false;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/common/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Client {
*/
virtual void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend) PURE;
};

Expand Down
25 changes: 16 additions & 9 deletions source/extensions/filters/common/ratelimit/ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,19 @@ void GrpcClientImpl::createRequest(envoy::service::ratelimit::v3::RateLimitReque

void GrpcClientImpl::limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
uint32_t hits_addend) {
Tracing::Span& parent_span,
OptRef<const StreamInfo::StreamInfo> stream_info, uint32_t hits_addend) {
ASSERT(callbacks_ == nullptr);
callbacks_ = &callbacks;

envoy::service::ratelimit::v3::RateLimitRequest request;
createRequest(request, domain, descriptors, hits_addend);

request_ =
async_client_->send(service_method_, request, *this, parent_span,
Http::AsyncClient::RequestOptions().setTimeout(timeout_).setParentContext(
Http::AsyncClient::ParentContext{&stream_info}));
auto options = Http::AsyncClient::RequestOptions().setTimeout(timeout_);
if (stream_info.has_value()) {
options.setParentContext(Http::AsyncClient::ParentContext{stream_info.ptr()});
}
request_ = async_client_->send(service_method_, request, *this, parent_span, options);
}

void GrpcClientImpl::onSuccess(
Expand Down Expand Up @@ -107,19 +108,25 @@ void GrpcClientImpl::onSuccess(
response->has_dynamic_metadata()
? std::make_unique<ProtobufWkt::Struct>(response->dynamic_metadata())
: nullptr;
callbacks_->complete(status, std::move(descriptor_statuses), std::move(response_headers_to_add),
// The rate limit requests applied on stream-done will destroy the client inside the complete
// callback, so we release the callback here to make the destructor happy.
auto call_backs = callbacks_;
callbacks_ = nullptr;
call_backs->complete(status, std::move(descriptor_statuses), std::move(response_headers_to_add),
std::move(request_headers_to_add), response->raw_body(),
std::move(dynamic_metadata));
callbacks_ = nullptr;
}

void GrpcClientImpl::onFailure(Grpc::Status::GrpcStatus status, const std::string& msg,
Tracing::Span&) {
ASSERT(status != Grpc::Status::WellKnownGrpcStatus::Ok);
ENVOY_LOG_TO_LOGGER(Logger::Registry::getLog(Logger::Id::filter), debug,
"rate limit fail, status={} msg={}", status, msg);
callbacks_->complete(LimitStatus::Error, nullptr, nullptr, nullptr, EMPTY_STRING, nullptr);
// The rate limit requests applied on stream-done will destroy the client inside the complete
// callback, so we release the callback here to make the destructor happy.
auto call_backs = callbacks_;
callbacks_ = nullptr;
call_backs->complete(LimitStatus::Error, nullptr, nullptr, nullptr, EMPTY_STRING, nullptr);
}

ClientPtr rateLimitClient(Server::Configuration::FactoryContext& context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GrpcClientImpl : public Client,
void cancel() override;
void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend = 0) override;

// Grpc::AsyncRequestCallbacks
Expand Down
95 changes: 65 additions & 30 deletions source/extensions/filters/http/ratelimit/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,52 +55,66 @@ void Filter::initiateCall(const Http::RequestHeaderMap& headers) {
return;
}

Router::RouteConstSharedPtr route = callbacks_->route();
if (!route || !route->routeEntry()) {
return;
std::vector<Envoy::RateLimit::Descriptor> descriptors;
populateRateLimitDescriptors(descriptors, headers, false);
if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo(), getHitAddend());
initiating_call_ = false;
}
}

cluster_ = callbacks_->clusterInfo();
if (!cluster_) {
void Filter::populateRateLimitDescriptors(std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done) {
if (!on_stream_done) {
// To use the exact same context for both request and on_stream_done rate limiting descriptors,
// we save the route and per-route configuration here and use them later.
route_ = callbacks_->route();
cluster_ = callbacks_->clusterInfo();
}
if (!route_ || !cluster_) {
return;
}

std::vector<Envoy::RateLimit::Descriptor> descriptors;

const Router::RouteEntry* route_entry = route->routeEntry();
const Router::RouteEntry* route_entry = route_->routeEntry();
if (!route_entry) {
return;
}
if (!on_stream_done) {
initializeVirtualHostRateLimitOption(route_entry);
}
// Get all applicable rate limit policy entries for the route.
populateRateLimitDescriptors(route_entry->rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route_entry->rateLimitPolicy(), descriptors, headers,
on_stream_done);

VhRateLimitOptions vh_rate_limit_option = getVirtualHostRateLimitOption(route);

switch (vh_rate_limit_option) {
switch (vh_rate_limits_) {
case VhRateLimitOptions::Ignore:
break;
case VhRateLimitOptions::Include:
populateRateLimitDescriptors(route->virtualHost().rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route_->virtualHost().rateLimitPolicy(), descriptors,
headers, on_stream_done);
break;
case VhRateLimitOptions::Override:
if (route_entry->rateLimitPolicy().empty()) {
populateRateLimitDescriptors(route->virtualHost().rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route_->virtualHost().rateLimitPolicy(), descriptors,
headers, on_stream_done);
}
break;
}
}

double Filter::getHitAddend() {
const StreamInfo::UInt32Accessor* hits_addend_filter_state =
callbacks_->streamInfo().filterState()->getDataReadOnly<StreamInfo::UInt32Accessor>(
HitsAddendFilterStateKey);
double hits_addend = 0;
if (hits_addend_filter_state != nullptr) {
hits_addend = hits_addend_filter_state->value();
}

if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo(), hits_addend);
initiating_call_ = false;
}
return hits_addend;
}

Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool) {
Expand Down Expand Up @@ -161,6 +175,16 @@ void Filter::onDestroy() {
if (state_ == State::Calling) {
state_ = State::Complete;
client_->cancel();
} else if (client_ != nullptr) {
std::vector<Envoy::RateLimit::Descriptor> descriptors;
populateRateLimitDescriptors(descriptors, *request_headers_, true);
if (!descriptors.empty()) {
// Since this filter is being destroyed, we need to keep the client alive until the request
// is complete by leaking the client with OnStreamDoneCallBack.
auto callback = new OnStreamDoneCallBack(std::move(client_));
callback->client().limit(*callback, getDomain(), descriptors, Tracing::NullSpan::instance(),
absl::nullopt, getHitAddend());
}
}
}

Expand Down Expand Up @@ -256,9 +280,10 @@ void Filter::complete(Filters::Common::RateLimit::LimitStatus status,
}
}

void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers) const {
void Filter::populateRateLimitDescriptorsForPolicy(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done) {
for (const Router::RateLimitPolicyEntry& rate_limit :
rate_limit_policy.getApplicableRateLimit(config_->stage())) {
const std::string& disable_key = rate_limit.disableKey();
Expand All @@ -267,8 +292,11 @@ void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_li
fmt::format("ratelimit.{}.http_filter_enabled", disable_key), 100)) {
continue;
}
rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
const bool apply_on_stream_done = rate_limit.applyOnStreamDone();
if (on_stream_done == apply_on_stream_done) {
rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
}
}
}

Expand Down Expand Up @@ -296,8 +324,8 @@ void Filter::appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add) {
}
}

VhRateLimitOptions Filter::getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route) {
if (route->routeEntry()->includeVirtualHostRateLimits()) {
void Filter::initializeVirtualHostRateLimitOption(const Router::RouteEntry* route_entry) {
if (route_entry->includeVirtualHostRateLimits()) {
vh_rate_limits_ = VhRateLimitOptions::Include;
} else {
const auto* specific_per_route_config =
Expand All @@ -318,7 +346,6 @@ VhRateLimitOptions Filter::getVirtualHostRateLimitOption(const Router::RouteCons
vh_rate_limits_ = VhRateLimitOptions::Override;
}
}
return vh_rate_limits_;
}

std::string Filter::getDomain() {
Expand All @@ -330,6 +357,14 @@ std::string Filter::getDomain() {
return config_->domain();
}

void OnStreamDoneCallBack::complete(Filters::Common::RateLimit::LimitStatus,
Filters::Common::RateLimit::DescriptorStatusListPtr&&,
Http::ResponseHeaderMapPtr&&, Http::RequestHeaderMapPtr&&,
const std::string&,
Filters::Common::RateLimit::DynamicMetadataPtr&&) {
delete this;
}

} // namespace RateLimitFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
34 changes: 30 additions & 4 deletions source/extensions/filters/http/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,16 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req

private:
void initiateCall(const Http::RequestHeaderMap& headers);
void populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers) const;
void populateRateLimitDescriptors(std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers, bool on_stream_done);
void populateRateLimitDescriptorsForPolicy(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done);
void populateResponseHeaders(Http::HeaderMap& response_headers, bool from_local_reply);
void appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add);
VhRateLimitOptions getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route);
double getHitAddend();
void initializeVirtualHostRateLimitOption(const Router::RouteEntry* route_entry);
std::string getDomain();

Http::Context& httpContext() { return config_->httpContext(); }
Expand All @@ -203,11 +207,33 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req
State state_{State::NotStarted};
VhRateLimitOptions vh_rate_limits_;
Upstream::ClusterInfoConstSharedPtr cluster_;
Router::RouteConstSharedPtr route_ = nullptr;
bool initiating_call_{};
Http::ResponseHeaderMapPtr response_headers_to_add_;
Http::RequestHeaderMap* request_headers_{};
};

/**
* This implements the rate limit callback that outlives the filter holding the client.
* On completion, it deletes itself.
*/
class OnStreamDoneCallBack : public Filters::Common::RateLimit::RequestCallbacks {
public:
OnStreamDoneCallBack(Filters::Common::RateLimit::ClientPtr client) : client_(std::move(client)) {}
~OnStreamDoneCallBack() override = default;

// RateLimit::RequestCallbacks
void complete(Filters::Common::RateLimit::LimitStatus,
Filters::Common::RateLimit::DescriptorStatusListPtr&&, Http::ResponseHeaderMapPtr&&,
Http::RequestHeaderMapPtr&&, const std::string&,
Filters::Common::RateLimit::DynamicMetadataPtr&&) override;

Filters::Common::RateLimit::Client& client() { return *client_; }

private:
Filters::Common::RateLimit::ClientPtr client_;
};

} // namespace RateLimitFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/common/ratelimit/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class MockClient : public Client {
MOCK_METHOD(void, limit,
(RequestCallbacks & callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend));
};

Expand Down
Loading

0 comments on commit 5aa40f0

Please sign in to comment.