Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http ratelimit: option to reduce budget on stream done #37548

Merged
merged 30 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9deac5f
ratelimit: option to excute action on stream done
mathetake Dec 6, 2024
5fe0370
format
mathetake Dec 6, 2024
32ea3e9
more clarify
mathetake Dec 6, 2024
e346217
clarify more
mathetake Dec 7, 2024
c8e0b73
Apply feedback in offline: action is either request or response, not …
mathetake Dec 9, 2024
6166b90
Apply offline review: remove the mention of envoy.ratelimit.hits_adde…
mathetake Dec 9, 2024
cab0154
more comments
mathetake Dec 9, 2024
371e4c6
Put it in the correct place
mathetake Dec 9, 2024
7e77a7f
Filter config level
mathetake Dec 9, 2024
8f29563
mention envoy.ratelimit.hits_addend
mathetake Dec 11, 2024
a02e932
Merge remote-tracking branch 'origin/main' into actionapplyondone
mathetake Dec 11, 2024
6598d44
Add more comments
mathetake Dec 11, 2024
59c2d1d
impl
mathetake Dec 11, 2024
3047603
format
mathetake Dec 11, 2024
c812eec
adds tests
mathetake Dec 11, 2024
d4448c4
ok
mathetake Dec 12, 2024
7b9bd57
simplifies
mathetake Dec 12, 2024
011a816
more
mathetake Dec 12, 2024
57564f4
review: flag on descriptor/policy level
mathetake Dec 12, 2024
146af55
more
mathetake Dec 12, 2024
702e427
reviews: apply comments
mathetake Dec 17, 2024
5aaa9c1
Merge remote-tracking branch 'origin/main' into actionapplyondone
mathetake Dec 17, 2024
f3c7b32
review: get descriptors in done
mathetake Dec 18, 2024
74a6cc9
typo
mathetake Dec 18, 2024
0ba3b25
remove unnecessary change
mathetake Dec 18, 2024
8823652
Apply review comments
mathetake Dec 18, 2024
946ce6f
review: save route_ and per-route vh_rate_limits_ to for consistency
mathetake Dec 18, 2024
b100c6d
Merge remote-tracking branch 'origin/main' into actionapplyondone
mathetake Dec 18, 2024
5dec975
simplify
mathetake Dec 18, 2024
b76e3d7
apply review commments
mathetake Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion api/envoy/config/route/v3/route_components.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,7 @@ message VirtualCluster {

// Global rate limiting :ref:`architecture overview <arch_overview_global_rate_limit>`.
// Also applies to Local rate limiting :ref:`using descriptors <config_http_filters_local_rate_limit_descriptors>`.
// [#next-free-field: 6]
// [#next-free-field: 7]
message RateLimit {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.route.RateLimit";

Expand Down Expand Up @@ -2245,6 +2245,23 @@ message RateLimit {
// :ref:`VirtualHost.typed_per_filter_config<envoy_v3_api_field_config.route.v3.VirtualHost.typed_per_filter_config>` or
// :ref:`Route.typed_per_filter_config<envoy_v3_api_field_config.route.v3.Route.typed_per_filter_config>`, etc.
HitsAddend hits_addend = 5;

// If true, the rate limit request will be applied when the stream completes. The default value is false.
// This is useful when the rate limit budget needs to reflect the response context that is not available
// on the request path.
//
// For example, let's say the upstream service calculates the usage statistics and returns them in the response body
// and we want to utilize these numbers to apply the rate limit action for the subsequent requests.
// Combined with another filter that can set the desired addend based on the response (e.g. Lua filter),
// this can be used to subtract the usage statistics from the rate limit budget.
//
// A rate limit applied on the stream completion is "fire-and-forget" by nature, and rate limit is not enforced by this config.
// In other words, the current request won't be blocked when this is true, but the budget will be updated for the subsequent
// requests based on the action with this field set to true. Users should ensure that the rate limit is enforced by the actions
// applied on the request path, i.e. the ones with this field set to false.
//
// Currently, this is only supported by the HTTP global rate filter.
bool apply_on_stream_done = 6;
}

// .. attention::
Expand Down
5 changes: 5 additions & 0 deletions envoy/router/router_ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class RateLimitPolicyEntry {
*/
virtual const std::string& disableKey() const PURE;

/**
* @return true if this rate limit policy should be applied on stream done.
*/
virtual bool applyOnStreamDone() const PURE;

/**
* Potentially populate the descriptor array with new descriptors to query.
* @param descriptors supplies the descriptor array to optionally fill.
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/router_ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ RateLimitPolicyEntryImpl::RateLimitPolicyEntryImpl(
const envoy::config::route::v3::RateLimit& config,
Server::Configuration::CommonFactoryContext& context, absl::Status& creation_status)
: disable_key_(config.disable_key()),
stage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))) {
stage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))),
apply_on_stream_done_(config.apply_on_stream_done()) {
for (const auto& action : config.actions()) {
switch (action.action_specifier_case()) {
case envoy::config::route::v3::RateLimit::Action::ActionSpecifierCase::kSourceCluster:
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router_ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,14 @@ class RateLimitPolicyEntryImpl : public RateLimitPolicyEntry {
const std::string& local_service_cluster,
const Http::RequestHeaderMap&,
const StreamInfo::StreamInfo& info) const override;
bool applyOnStreamDone() const override { return apply_on_stream_done_; }

private:
const std::string disable_key_;
uint64_t stage_;
std::vector<RateLimit::DescriptorProducerPtr> actions_;
absl::optional<RateLimitOverrideActionPtr> limit_override_ = absl::nullopt;
const bool apply_on_stream_done_ = false;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/common/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Client {
*/
virtual void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend) PURE;
};

Expand Down
25 changes: 16 additions & 9 deletions source/extensions/filters/common/ratelimit/ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,19 @@ void GrpcClientImpl::createRequest(envoy::service::ratelimit::v3::RateLimitReque

void GrpcClientImpl::limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
uint32_t hits_addend) {
Tracing::Span& parent_span,
OptRef<const StreamInfo::StreamInfo> stream_info, uint32_t hits_addend) {
ASSERT(callbacks_ == nullptr);
callbacks_ = &callbacks;

envoy::service::ratelimit::v3::RateLimitRequest request;
createRequest(request, domain, descriptors, hits_addend);

request_ =
async_client_->send(service_method_, request, *this, parent_span,
Http::AsyncClient::RequestOptions().setTimeout(timeout_).setParentContext(
Http::AsyncClient::ParentContext{&stream_info}));
auto options = Http::AsyncClient::RequestOptions().setTimeout(timeout_);
if (stream_info.has_value()) {
options.setParentContext(Http::AsyncClient::ParentContext{stream_info.ptr()});
}
request_ = async_client_->send(service_method_, request, *this, parent_span, options);
}

void GrpcClientImpl::onSuccess(
Expand Down Expand Up @@ -107,19 +108,25 @@ void GrpcClientImpl::onSuccess(
response->has_dynamic_metadata()
? std::make_unique<ProtobufWkt::Struct>(response->dynamic_metadata())
: nullptr;
callbacks_->complete(status, std::move(descriptor_statuses), std::move(response_headers_to_add),
// The rate limit requests applied on stream-done will destroy the client inside the complete
// callback, so we release the callback here to make the destructor happy.
auto call_backs = callbacks_;
callbacks_ = nullptr;
call_backs->complete(status, std::move(descriptor_statuses), std::move(response_headers_to_add),
std::move(request_headers_to_add), response->raw_body(),
std::move(dynamic_metadata));
callbacks_ = nullptr;
}

void GrpcClientImpl::onFailure(Grpc::Status::GrpcStatus status, const std::string& msg,
Tracing::Span&) {
ASSERT(status != Grpc::Status::WellKnownGrpcStatus::Ok);
ENVOY_LOG_TO_LOGGER(Logger::Registry::getLog(Logger::Id::filter), debug,
"rate limit fail, status={} msg={}", status, msg);
callbacks_->complete(LimitStatus::Error, nullptr, nullptr, nullptr, EMPTY_STRING, nullptr);
// The rate limit requests applied on stream-done will destroy the client inside the complete
// callback, so we release the callback here to make the destructor happy.
auto call_backs = callbacks_;
callbacks_ = nullptr;
call_backs->complete(LimitStatus::Error, nullptr, nullptr, nullptr, EMPTY_STRING, nullptr);
mathetake marked this conversation as resolved.
Show resolved Hide resolved
}

ClientPtr rateLimitClient(Server::Configuration::FactoryContext& context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GrpcClientImpl : public Client,
void cancel() override;
void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend = 0) override;

// Grpc::AsyncRequestCallbacks
Expand Down
95 changes: 65 additions & 30 deletions source/extensions/filters/http/ratelimit/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,52 +55,66 @@ void Filter::initiateCall(const Http::RequestHeaderMap& headers) {
return;
}

Router::RouteConstSharedPtr route = callbacks_->route();
if (!route || !route->routeEntry()) {
return;
std::vector<Envoy::RateLimit::Descriptor> descriptors;
populateRateLimitDescriptors(descriptors, headers, false);
if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo(), getHitAddend());
initiating_call_ = false;
}
}

cluster_ = callbacks_->clusterInfo();
if (!cluster_) {
void Filter::populateRateLimitDescriptors(std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done) {
if (!on_stream_done) {
// To use the exact same context for both request and on_stream_done rate limiting descriptors,
// we save the route and per-route configuration here and use them later.
route_ = callbacks_->route();
cluster_ = callbacks_->clusterInfo();
}
if (!route_ || !cluster_) {
return;
}

std::vector<Envoy::RateLimit::Descriptor> descriptors;

const Router::RouteEntry* route_entry = route->routeEntry();
const Router::RouteEntry* route_entry = route_->routeEntry();
if (!route_entry) {
return;
}
if (!on_stream_done) {
initializeVirtualHostRateLimitOption(route_entry);
}
// Get all applicable rate limit policy entries for the route.
populateRateLimitDescriptors(route_entry->rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route_entry->rateLimitPolicy(), descriptors, headers,
on_stream_done);

VhRateLimitOptions vh_rate_limit_option = getVirtualHostRateLimitOption(route);

switch (vh_rate_limit_option) {
switch (vh_rate_limits_) {
case VhRateLimitOptions::Ignore:
break;
case VhRateLimitOptions::Include:
populateRateLimitDescriptors(route->virtualHost().rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route_->virtualHost().rateLimitPolicy(), descriptors,
headers, on_stream_done);
break;
case VhRateLimitOptions::Override:
if (route_entry->rateLimitPolicy().empty()) {
populateRateLimitDescriptors(route->virtualHost().rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route_->virtualHost().rateLimitPolicy(), descriptors,
headers, on_stream_done);
}
break;
}
}

double Filter::getHitAddend() {
const StreamInfo::UInt32Accessor* hits_addend_filter_state =
callbacks_->streamInfo().filterState()->getDataReadOnly<StreamInfo::UInt32Accessor>(
HitsAddendFilterStateKey);
double hits_addend = 0;
if (hits_addend_filter_state != nullptr) {
hits_addend = hits_addend_filter_state->value();
}

if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo(), hits_addend);
initiating_call_ = false;
}
return hits_addend;
}

Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool) {
Expand Down Expand Up @@ -161,6 +175,16 @@ void Filter::onDestroy() {
if (state_ == State::Calling) {
state_ = State::Complete;
client_->cancel();
} else if (client_ != nullptr) {
std::vector<Envoy::RateLimit::Descriptor> descriptors;
populateRateLimitDescriptors(descriptors, *request_headers_, true);
if (!descriptors.empty()) {
// Since this filter is being destroyed, we need to keep the client alive until the request
// is complete by leaking the client with OnStreamDoneCallBack.
auto callback = new OnStreamDoneCallBack(std::move(client_));
callback->client().limit(*callback, getDomain(), descriptors, Tracing::NullSpan::instance(),
absl::nullopt, getHitAddend());
}
}
}

Expand Down Expand Up @@ -256,9 +280,10 @@ void Filter::complete(Filters::Common::RateLimit::LimitStatus status,
}
}

void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers) const {
void Filter::populateRateLimitDescriptorsForPolicy(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done) {
for (const Router::RateLimitPolicyEntry& rate_limit :
rate_limit_policy.getApplicableRateLimit(config_->stage())) {
const std::string& disable_key = rate_limit.disableKey();
Expand All @@ -267,8 +292,11 @@ void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_li
fmt::format("ratelimit.{}.http_filter_enabled", disable_key), 100)) {
continue;
}
rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
const bool apply_on_stream_done = rate_limit.applyOnStreamDone();
if (on_stream_done == apply_on_stream_done) {
rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
}
mathetake marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -296,8 +324,8 @@ void Filter::appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add) {
}
}

VhRateLimitOptions Filter::getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route) {
if (route->routeEntry()->includeVirtualHostRateLimits()) {
void Filter::initializeVirtualHostRateLimitOption(const Router::RouteEntry* route_entry) {
if (route_entry->includeVirtualHostRateLimits()) {
vh_rate_limits_ = VhRateLimitOptions::Include;
} else {
const auto* specific_per_route_config =
Expand All @@ -318,7 +346,6 @@ VhRateLimitOptions Filter::getVirtualHostRateLimitOption(const Router::RouteCons
vh_rate_limits_ = VhRateLimitOptions::Override;
}
}
return vh_rate_limits_;
}

std::string Filter::getDomain() {
Expand All @@ -330,6 +357,14 @@ std::string Filter::getDomain() {
return config_->domain();
}

void OnStreamDoneCallBack::complete(Filters::Common::RateLimit::LimitStatus,
Filters::Common::RateLimit::DescriptorStatusListPtr&&,
Http::ResponseHeaderMapPtr&&, Http::RequestHeaderMapPtr&&,
const std::string&,
Filters::Common::RateLimit::DynamicMetadataPtr&&) {
delete this;
}

} // namespace RateLimitFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
34 changes: 30 additions & 4 deletions source/extensions/filters/http/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,16 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req

private:
void initiateCall(const Http::RequestHeaderMap& headers);
void populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers) const;
void populateRateLimitDescriptors(std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers, bool on_stream_done);
void populateRateLimitDescriptorsForPolicy(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done);
void populateResponseHeaders(Http::HeaderMap& response_headers, bool from_local_reply);
void appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add);
VhRateLimitOptions getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route);
double getHitAddend();
void initializeVirtualHostRateLimitOption(const Router::RouteEntry* route_entry);
std::string getDomain();

Http::Context& httpContext() { return config_->httpContext(); }
Expand All @@ -203,11 +207,33 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req
State state_{State::NotStarted};
VhRateLimitOptions vh_rate_limits_;
Upstream::ClusterInfoConstSharedPtr cluster_;
Router::RouteConstSharedPtr route_ = nullptr;
bool initiating_call_{};
Http::ResponseHeaderMapPtr response_headers_to_add_;
Http::RequestHeaderMap* request_headers_{};
};

/**
* This implements the rate limit callback that outlives the filter holding the client.
* On completion, it deletes itself.
*/
class OnStreamDoneCallBack : public Filters::Common::RateLimit::RequestCallbacks {
public:
OnStreamDoneCallBack(Filters::Common::RateLimit::ClientPtr client) : client_(std::move(client)) {}
~OnStreamDoneCallBack() override = default;

// RateLimit::RequestCallbacks
void complete(Filters::Common::RateLimit::LimitStatus,
Filters::Common::RateLimit::DescriptorStatusListPtr&&, Http::ResponseHeaderMapPtr&&,
Http::RequestHeaderMapPtr&&, const std::string&,
Filters::Common::RateLimit::DynamicMetadataPtr&&) override;

Filters::Common::RateLimit::Client& client() { return *client_; }

private:
Filters::Common::RateLimit::ClientPtr client_;
};

} // namespace RateLimitFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/common/ratelimit/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class MockClient : public Client {
MOCK_METHOD(void, limit,
(RequestCallbacks & callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend));
};

Expand Down
Loading
Loading