Skip to content

Commit

Permalink
http ratelimit: option to reduce budget on stream done (#37548)
Browse files Browse the repository at this point in the history
Commit Message: ratelimit: option to excute action on stream done

Additional Description:
This adds a new option `apply_on_stream_done` to the rate limit 
policy corresponding to each descriptor. This basically allows to
configure
descriptors to be executed in a response content-aware way and do not
enforce the rate limit (in other words "fire-and-forget"). Since addend 
can be currently controlled via metadata per descriptor,
another filter can be used to set the value to reflect their intent
there,
for example, by using  Lua or Ext Proc filters.

This use case arises from the LLM API services which usually return
the usage statistics in the response body. More specifically, 
they have "streaming" APIs whose response is a line-by-line event
stream where the very last line of the response line contains the 
usage statistics. The lazy nature of this action is perfectly fine
as in these use cases, the rate limit happens like "you are forbidden
from the next time".

Besides the LLM specific, I've also encountered the use case from the 
data center resource allocation case where the operators want to
"block the computation from the next time since you used this much
resources in this request".

Ref: envoyproxy/gateway#4756

Risk Level: low
Testing: done
Docs Changes: done
Release Notes: TODO
Platform Specific Features: n/a

---------

Signed-off-by: Takeshi Yoneda <[email protected]>
  • Loading branch information
mathetake authored Dec 19, 2024
1 parent 99bbfe8 commit 857107b
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 52 deletions.
19 changes: 18 additions & 1 deletion api/envoy/config/route/v3/route_components.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,7 @@ message VirtualCluster {

// Global rate limiting :ref:`architecture overview <arch_overview_global_rate_limit>`.
// Also applies to Local rate limiting :ref:`using descriptors <config_http_filters_local_rate_limit_descriptors>`.
// [#next-free-field: 6]
// [#next-free-field: 7]
message RateLimit {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.route.RateLimit";

Expand Down Expand Up @@ -2245,6 +2245,23 @@ message RateLimit {
// :ref:`VirtualHost.typed_per_filter_config<envoy_v3_api_field_config.route.v3.VirtualHost.typed_per_filter_config>` or
// :ref:`Route.typed_per_filter_config<envoy_v3_api_field_config.route.v3.Route.typed_per_filter_config>`, etc.
HitsAddend hits_addend = 5;

// If true, the rate limit request will be applied when the stream completes. The default value is false.
// This is useful when the rate limit budget needs to reflect the response context that is not available
// on the request path.
//
// For example, let's say the upstream service calculates the usage statistics and returns them in the response body
// and we want to utilize these numbers to apply the rate limit action for the subsequent requests.
// Combined with another filter that can set the desired addend based on the response (e.g. Lua filter),
// this can be used to subtract the usage statistics from the rate limit budget.
//
// A rate limit applied on the stream completion is "fire-and-forget" by nature, and rate limit is not enforced by this config.
// In other words, the current request won't be blocked when this is true, but the budget will be updated for the subsequent
// requests based on the action with this field set to true. Users should ensure that the rate limit is enforced by the actions
// applied on the request path, i.e. the ones with this field set to false.
//
// Currently, this is only supported by the HTTP global rate filter.
bool apply_on_stream_done = 6;
}

// .. attention::
Expand Down
5 changes: 5 additions & 0 deletions envoy/router/router_ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class RateLimitPolicyEntry {
*/
virtual const std::string& disableKey() const PURE;

/**
* @return true if this rate limit policy should be applied on stream done.
*/
virtual bool applyOnStreamDone() const PURE;

/**
* Potentially populate the descriptor array with new descriptors to query.
* @param descriptors supplies the descriptor array to optionally fill.
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/router_ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ RateLimitPolicyEntryImpl::RateLimitPolicyEntryImpl(
const envoy::config::route::v3::RateLimit& config,
Server::Configuration::CommonFactoryContext& context, absl::Status& creation_status)
: disable_key_(config.disable_key()),
stage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))) {
stage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))),
apply_on_stream_done_(config.apply_on_stream_done()) {
for (const auto& action : config.actions()) {
switch (action.action_specifier_case()) {
case envoy::config::route::v3::RateLimit::Action::ActionSpecifierCase::kSourceCluster:
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router_ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,14 @@ class RateLimitPolicyEntryImpl : public RateLimitPolicyEntry {
const std::string& local_service_cluster,
const Http::RequestHeaderMap&,
const StreamInfo::StreamInfo& info) const override;
bool applyOnStreamDone() const override { return apply_on_stream_done_; }

private:
const std::string disable_key_;
uint64_t stage_;
std::vector<RateLimit::DescriptorProducerPtr> actions_;
absl::optional<RateLimitOverrideActionPtr> limit_override_ = absl::nullopt;
const bool apply_on_stream_done_ = false;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/common/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Client {
*/
virtual void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend) PURE;
};

Expand Down
25 changes: 16 additions & 9 deletions source/extensions/filters/common/ratelimit/ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,19 @@ void GrpcClientImpl::createRequest(envoy::service::ratelimit::v3::RateLimitReque

void GrpcClientImpl::limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
uint32_t hits_addend) {
Tracing::Span& parent_span,
OptRef<const StreamInfo::StreamInfo> stream_info, uint32_t hits_addend) {
ASSERT(callbacks_ == nullptr);
callbacks_ = &callbacks;

envoy::service::ratelimit::v3::RateLimitRequest request;
createRequest(request, domain, descriptors, hits_addend);

request_ =
async_client_->send(service_method_, request, *this, parent_span,
Http::AsyncClient::RequestOptions().setTimeout(timeout_).setParentContext(
Http::AsyncClient::ParentContext{&stream_info}));
auto options = Http::AsyncClient::RequestOptions().setTimeout(timeout_);
if (stream_info.has_value()) {
options.setParentContext(Http::AsyncClient::ParentContext{stream_info.ptr()});
}
request_ = async_client_->send(service_method_, request, *this, parent_span, options);
}

void GrpcClientImpl::onSuccess(
Expand Down Expand Up @@ -107,19 +108,25 @@ void GrpcClientImpl::onSuccess(
response->has_dynamic_metadata()
? std::make_unique<ProtobufWkt::Struct>(response->dynamic_metadata())
: nullptr;
callbacks_->complete(status, std::move(descriptor_statuses), std::move(response_headers_to_add),
// The rate limit requests applied on stream-done will destroy the client inside the complete
// callback, so we release the callback here to make the destructor happy.
auto call_backs = callbacks_;
callbacks_ = nullptr;
call_backs->complete(status, std::move(descriptor_statuses), std::move(response_headers_to_add),
std::move(request_headers_to_add), response->raw_body(),
std::move(dynamic_metadata));
callbacks_ = nullptr;
}

void GrpcClientImpl::onFailure(Grpc::Status::GrpcStatus status, const std::string& msg,
Tracing::Span&) {
ASSERT(status != Grpc::Status::WellKnownGrpcStatus::Ok);
ENVOY_LOG_TO_LOGGER(Logger::Registry::getLog(Logger::Id::filter), debug,
"rate limit fail, status={} msg={}", status, msg);
callbacks_->complete(LimitStatus::Error, nullptr, nullptr, nullptr, EMPTY_STRING, nullptr);
// The rate limit requests applied on stream-done will destroy the client inside the complete
// callback, so we release the callback here to make the destructor happy.
auto call_backs = callbacks_;
callbacks_ = nullptr;
call_backs->complete(LimitStatus::Error, nullptr, nullptr, nullptr, EMPTY_STRING, nullptr);
}

ClientPtr rateLimitClient(Server::Configuration::FactoryContext& context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GrpcClientImpl : public Client,
void cancel() override;
void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend = 0) override;

// Grpc::AsyncRequestCallbacks
Expand Down
95 changes: 65 additions & 30 deletions source/extensions/filters/http/ratelimit/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,52 +55,66 @@ void Filter::initiateCall(const Http::RequestHeaderMap& headers) {
return;
}

Router::RouteConstSharedPtr route = callbacks_->route();
if (!route || !route->routeEntry()) {
return;
std::vector<Envoy::RateLimit::Descriptor> descriptors;
populateRateLimitDescriptors(descriptors, headers, false);
if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo(), getHitAddend());
initiating_call_ = false;
}
}

cluster_ = callbacks_->clusterInfo();
if (!cluster_) {
void Filter::populateRateLimitDescriptors(std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done) {
if (!on_stream_done) {
// To use the exact same context for both request and on_stream_done rate limiting descriptors,
// we save the route and per-route configuration here and use them later.
route_ = callbacks_->route();
cluster_ = callbacks_->clusterInfo();
}
if (!route_ || !cluster_) {
return;
}

std::vector<Envoy::RateLimit::Descriptor> descriptors;

const Router::RouteEntry* route_entry = route->routeEntry();
const Router::RouteEntry* route_entry = route_->routeEntry();
if (!route_entry) {
return;
}
if (!on_stream_done) {
initializeVirtualHostRateLimitOption(route_entry);
}
// Get all applicable rate limit policy entries for the route.
populateRateLimitDescriptors(route_entry->rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route_entry->rateLimitPolicy(), descriptors, headers,
on_stream_done);

VhRateLimitOptions vh_rate_limit_option = getVirtualHostRateLimitOption(route);

switch (vh_rate_limit_option) {
switch (vh_rate_limits_) {
case VhRateLimitOptions::Ignore:
break;
case VhRateLimitOptions::Include:
populateRateLimitDescriptors(route->virtualHost().rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route_->virtualHost().rateLimitPolicy(), descriptors,
headers, on_stream_done);
break;
case VhRateLimitOptions::Override:
if (route_entry->rateLimitPolicy().empty()) {
populateRateLimitDescriptors(route->virtualHost().rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route_->virtualHost().rateLimitPolicy(), descriptors,
headers, on_stream_done);
}
break;
}
}

double Filter::getHitAddend() {
const StreamInfo::UInt32Accessor* hits_addend_filter_state =
callbacks_->streamInfo().filterState()->getDataReadOnly<StreamInfo::UInt32Accessor>(
HitsAddendFilterStateKey);
double hits_addend = 0;
if (hits_addend_filter_state != nullptr) {
hits_addend = hits_addend_filter_state->value();
}

if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo(), hits_addend);
initiating_call_ = false;
}
return hits_addend;
}

Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool) {
Expand Down Expand Up @@ -161,6 +175,16 @@ void Filter::onDestroy() {
if (state_ == State::Calling) {
state_ = State::Complete;
client_->cancel();
} else if (client_ != nullptr) {
std::vector<Envoy::RateLimit::Descriptor> descriptors;
populateRateLimitDescriptors(descriptors, *request_headers_, true);
if (!descriptors.empty()) {
// Since this filter is being destroyed, we need to keep the client alive until the request
// is complete by leaking the client with OnStreamDoneCallBack.
auto callback = new OnStreamDoneCallBack(std::move(client_));
callback->client().limit(*callback, getDomain(), descriptors, Tracing::NullSpan::instance(),
absl::nullopt, getHitAddend());
}
}
}

Expand Down Expand Up @@ -256,9 +280,10 @@ void Filter::complete(Filters::Common::RateLimit::LimitStatus status,
}
}

void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers) const {
void Filter::populateRateLimitDescriptorsForPolicy(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done) {
for (const Router::RateLimitPolicyEntry& rate_limit :
rate_limit_policy.getApplicableRateLimit(config_->stage())) {
const std::string& disable_key = rate_limit.disableKey();
Expand All @@ -267,8 +292,11 @@ void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_li
fmt::format("ratelimit.{}.http_filter_enabled", disable_key), 100)) {
continue;
}
rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
const bool apply_on_stream_done = rate_limit.applyOnStreamDone();
if (on_stream_done == apply_on_stream_done) {
rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
}
}
}

Expand Down Expand Up @@ -296,8 +324,8 @@ void Filter::appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add) {
}
}

VhRateLimitOptions Filter::getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route) {
if (route->routeEntry()->includeVirtualHostRateLimits()) {
void Filter::initializeVirtualHostRateLimitOption(const Router::RouteEntry* route_entry) {
if (route_entry->includeVirtualHostRateLimits()) {
vh_rate_limits_ = VhRateLimitOptions::Include;
} else {
const auto* specific_per_route_config =
Expand All @@ -318,7 +346,6 @@ VhRateLimitOptions Filter::getVirtualHostRateLimitOption(const Router::RouteCons
vh_rate_limits_ = VhRateLimitOptions::Override;
}
}
return vh_rate_limits_;
}

std::string Filter::getDomain() {
Expand All @@ -330,6 +357,14 @@ std::string Filter::getDomain() {
return config_->domain();
}

void OnStreamDoneCallBack::complete(Filters::Common::RateLimit::LimitStatus,
Filters::Common::RateLimit::DescriptorStatusListPtr&&,
Http::ResponseHeaderMapPtr&&, Http::RequestHeaderMapPtr&&,
const std::string&,
Filters::Common::RateLimit::DynamicMetadataPtr&&) {
delete this;
}

} // namespace RateLimitFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
34 changes: 30 additions & 4 deletions source/extensions/filters/http/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,16 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req

private:
void initiateCall(const Http::RequestHeaderMap& headers);
void populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers) const;
void populateRateLimitDescriptors(std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers, bool on_stream_done);
void populateRateLimitDescriptorsForPolicy(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done);
void populateResponseHeaders(Http::HeaderMap& response_headers, bool from_local_reply);
void appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add);
VhRateLimitOptions getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route);
double getHitAddend();
void initializeVirtualHostRateLimitOption(const Router::RouteEntry* route_entry);
std::string getDomain();

Http::Context& httpContext() { return config_->httpContext(); }
Expand All @@ -203,11 +207,33 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req
State state_{State::NotStarted};
VhRateLimitOptions vh_rate_limits_;
Upstream::ClusterInfoConstSharedPtr cluster_;
Router::RouteConstSharedPtr route_ = nullptr;
bool initiating_call_{};
Http::ResponseHeaderMapPtr response_headers_to_add_;
Http::RequestHeaderMap* request_headers_{};
};

/**
* This implements the rate limit callback that outlives the filter holding the client.
* On completion, it deletes itself.
*/
class OnStreamDoneCallBack : public Filters::Common::RateLimit::RequestCallbacks {
public:
OnStreamDoneCallBack(Filters::Common::RateLimit::ClientPtr client) : client_(std::move(client)) {}
~OnStreamDoneCallBack() override = default;

// RateLimit::RequestCallbacks
void complete(Filters::Common::RateLimit::LimitStatus,
Filters::Common::RateLimit::DescriptorStatusListPtr&&, Http::ResponseHeaderMapPtr&&,
Http::RequestHeaderMapPtr&&, const std::string&,
Filters::Common::RateLimit::DynamicMetadataPtr&&) override;

Filters::Common::RateLimit::Client& client() { return *client_; }

private:
Filters::Common::RateLimit::ClientPtr client_;
};

} // namespace RateLimitFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/common/ratelimit/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class MockClient : public Client {
MOCK_METHOD(void, limit,
(RequestCallbacks & callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend));
};

Expand Down
Loading

0 comments on commit 857107b

Please sign in to comment.