Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http ratelimit: option to reduce budget on stream done #37548

Merged
merged 30 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9deac5f
ratelimit: option to excute action on stream done
mathetake Dec 6, 2024
5fe0370
format
mathetake Dec 6, 2024
32ea3e9
more clarify
mathetake Dec 6, 2024
e346217
clarify more
mathetake Dec 7, 2024
c8e0b73
Apply feedback in offline: action is either request or response, not …
mathetake Dec 9, 2024
6166b90
Apply offline review: remove the mention of envoy.ratelimit.hits_adde…
mathetake Dec 9, 2024
cab0154
more comments
mathetake Dec 9, 2024
371e4c6
Put it in the correct place
mathetake Dec 9, 2024
7e77a7f
Filter config level
mathetake Dec 9, 2024
8f29563
mention envoy.ratelimit.hits_addend
mathetake Dec 11, 2024
a02e932
Merge remote-tracking branch 'origin/main' into actionapplyondone
mathetake Dec 11, 2024
6598d44
Add more comments
mathetake Dec 11, 2024
59c2d1d
impl
mathetake Dec 11, 2024
3047603
format
mathetake Dec 11, 2024
c812eec
adds tests
mathetake Dec 11, 2024
d4448c4
ok
mathetake Dec 12, 2024
7b9bd57
simplifies
mathetake Dec 12, 2024
011a816
more
mathetake Dec 12, 2024
57564f4
review: flag on descriptor/policy level
mathetake Dec 12, 2024
146af55
more
mathetake Dec 12, 2024
702e427
reviews: apply comments
mathetake Dec 17, 2024
5aaa9c1
Merge remote-tracking branch 'origin/main' into actionapplyondone
mathetake Dec 17, 2024
f3c7b32
review: get descriptors in done
mathetake Dec 18, 2024
74a6cc9
typo
mathetake Dec 18, 2024
0ba3b25
remove unnecessary change
mathetake Dec 18, 2024
8823652
Apply review comments
mathetake Dec 18, 2024
946ce6f
review: save route_ and per-route vh_rate_limits_ to for consistency
mathetake Dec 18, 2024
b100c6d
Merge remote-tracking branch 'origin/main' into actionapplyondone
mathetake Dec 18, 2024
5dec975
simplify
mathetake Dec 18, 2024
b76e3d7
apply review commments
mathetake Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/envoy/config/route/v3/route_components.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,7 @@ message VirtualCluster {

// Global rate limiting :ref:`architecture overview <arch_overview_global_rate_limit>`.
// Also applies to Local rate limiting :ref:`using descriptors <config_http_filters_local_rate_limit_descriptors>`.
// [#next-free-field: 6]
message RateLimit {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.route.RateLimit";

Expand Down Expand Up @@ -2193,6 +2194,23 @@ message RateLimit {
// from metadata, no override is provided. See :ref:`rate limit override
// <config_http_filters_rate_limit_rate_limit_override>` for more information.
Override limit = 4;

// If true, the rate limit request will be applied when the stream completes. The default value is false.
// This is useful when the rate limit budget needs to reflect the response context that is not available
// on the request path.
//
// For example, let's say the upstream service calculates the usage statistics and returns them in the response body
// and we want to utilize these numbers to apply the rate limit action for the subsequent requests.
// Combined with another filter that can set the desired addend based on the response (e.g. Lua filter),
// this can be used to subtract the usage statistics from the rate limit budget.
//
// A rate limit applied on the stream completion is "fire-and-forget" by nature, and rate limit is not enforced by this config.
// In other words, the current request won't be blocked when this is true, but the budget will be updated for the subsequent
// requests based on the action with this field set to true. Users should ensure that the rate limit is enforced by the actions
// applied on the request path, i.e. the ones with this field set to false.
//
// Currently, this is only supported by the HTTP global rate filter.
bool apply_on_stream_done = 5;
}

// .. attention::
Expand Down
5 changes: 5 additions & 0 deletions envoy/router/router_ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class RateLimitPolicyEntry {
*/
virtual const std::string& disableKey() const PURE;

/**
* @return true if this rate limit policy should be applied on stream done.
*/
virtual bool applyOnStreamDone() const PURE;

/**
* Potentially populate the descriptor array with new descriptors to query.
* @param descriptors supplies the descriptor array to optionally fill.
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/router_ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ RateLimitPolicyEntryImpl::RateLimitPolicyEntryImpl(
const envoy::config::route::v3::RateLimit& config,
Server::Configuration::CommonFactoryContext& context, absl::Status& creation_status)
: disable_key_(config.disable_key()),
stage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))) {
stage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))),
apply_on_stream_done_(config.apply_on_stream_done()) {
for (const auto& action : config.actions()) {
switch (action.action_specifier_case()) {
case envoy::config::route::v3::RateLimit::Action::ActionSpecifierCase::kSourceCluster:
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router_ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,14 @@ class RateLimitPolicyEntryImpl : public RateLimitPolicyEntry {
const std::string& local_service_cluster,
const Http::RequestHeaderMap&,
const StreamInfo::StreamInfo& info) const override;
bool applyOnStreamDone() const override { return apply_on_stream_done_; }

private:
const std::string disable_key_;
uint64_t stage_;
std::vector<RateLimit::DescriptorProducerPtr> actions_;
absl::optional<RateLimitOverrideActionPtr> limit_override_ = absl::nullopt;
bool apply_on_stream_done_;
mathetake marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/common/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Client {
*/
virtual void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend) PURE;
};

Expand Down
13 changes: 7 additions & 6 deletions source/extensions/filters/common/ratelimit/ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,19 @@ void GrpcClientImpl::createRequest(envoy::service::ratelimit::v3::RateLimitReque

void GrpcClientImpl::limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
uint32_t hits_addend) {
Tracing::Span& parent_span,
OptRef<const StreamInfo::StreamInfo> stream_info, uint32_t hits_addend) {
ASSERT(callbacks_ == nullptr);
callbacks_ = &callbacks;

envoy::service::ratelimit::v3::RateLimitRequest request;
createRequest(request, domain, descriptors, hits_addend);

request_ =
async_client_->send(service_method_, request, *this, parent_span,
Http::AsyncClient::RequestOptions().setTimeout(timeout_).setParentContext(
Http::AsyncClient::ParentContext{&stream_info}));
auto options = Http::AsyncClient::RequestOptions().setTimeout(timeout_);
if (stream_info) {
options.setParentContext(Http::AsyncClient::ParentContext{&*stream_info});
}
mathetake marked this conversation as resolved.
Show resolved Hide resolved
request_ = async_client_->send(service_method_, request, *this, parent_span, options);
}

void GrpcClientImpl::onSuccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GrpcClientImpl : public Client,
void cancel() override;
void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend = 0) override;

// Grpc::AsyncRequestCallbacks
Expand Down
54 changes: 43 additions & 11 deletions source/extensions/filters/http/ratelimit/ratelimit.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "source/extensions/filters/http/ratelimit/ratelimit.h"

#include <memory>
#include <string>
#include <vector>

Expand Down Expand Up @@ -86,21 +87,24 @@ void Filter::initiateCall(const Http::RequestHeaderMap& headers) {
break;
}

if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo(), getHitAddend());
initiating_call_ = false;
}
}

double Filter::getHitAddend() {
const StreamInfo::UInt32Accessor* hits_addend_filter_state =
callbacks_->streamInfo().filterState()->getDataReadOnly<StreamInfo::UInt32Accessor>(
HitsAddendFilterStateKey);
double hits_addend = 0;
if (hits_addend_filter_state != nullptr) {
hits_addend = hits_addend_filter_state->value();
}

if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo(), hits_addend);
initiating_call_ = false;
}
return hits_addend;
}

Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool) {
Expand Down Expand Up @@ -161,6 +165,18 @@ void Filter::onDestroy() {
if (state_ == State::Calling) {
state_ = State::Complete;
client_->cancel();
} else {
mathetake marked this conversation as resolved.
Show resolved Hide resolved
// If the filter doesn't have an outstanding limit request (made during decodeHeaders) and has
// descriptors, then we can apply the rate limit on stream done if the config allows it.
if (!descriptors_apply_on_stream_done_.empty()) {
client_->cancel(); // Clears the internal state of the client, so that we can reuse it.
// Since this filter is being destroyed, we need to keep the client alive until the request
// is complete.
auto callback = new OnStreamDoneCallBack(std::move(client_));
auto& ref = *callback;
mathetake marked this conversation as resolved.
Show resolved Hide resolved
callback->client()->limit(ref, getDomain(), descriptors_apply_on_stream_done_,
Tracing::NullSpan::instance(), absl::nullopt, getHitAddend());
}
}
}

Expand Down Expand Up @@ -258,7 +274,7 @@ void Filter::complete(Filters::Common::RateLimit::LimitStatus status,

void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers) const {
const Http::RequestHeaderMap& headers) {
for (const Router::RateLimitPolicyEntry& rate_limit :
rate_limit_policy.getApplicableRateLimit(config_->stage())) {
const std::string& disable_key = rate_limit.disableKey();
Expand All @@ -267,8 +283,16 @@ void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_li
fmt::format("ratelimit.{}.http_filter_enabled", disable_key), 100)) {
continue;
}
rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
if (rate_limit.applyOnStreamDone()) {
// If the rate limit policy is to be applied on stream done, we populate the descriptors in a
// separate vector. This vector will be used to apply the rate limit on stream done.
rate_limit.populateDescriptors(descriptors_apply_on_stream_done_,
config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
} else {
rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
}
mathetake marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -330,6 +354,14 @@ std::string Filter::getDomain() {
return config_->domain();
}

void OnStreamDoneCallBack::complete(Filters::Common::RateLimit::LimitStatus,
Filters::Common::RateLimit::DescriptorStatusListPtr&&,
Http::ResponseHeaderMapPtr&&, Http::RequestHeaderMapPtr&&,
const std::string&,
Filters::Common::RateLimit::DynamicMetadataPtr&&) {
delete this;
}

} // namespace RateLimitFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
27 changes: 26 additions & 1 deletion source/extensions/filters/http/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,10 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req
void initiateCall(const Http::RequestHeaderMap& headers);
void populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers) const;
const Http::RequestHeaderMap& headers);
void populateResponseHeaders(Http::HeaderMap& response_headers, bool from_local_reply);
void appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add);
double getHitAddend();
VhRateLimitOptions getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route);
std::string getDomain();

Expand All @@ -206,6 +207,30 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req
bool initiating_call_{};
Http::ResponseHeaderMapPtr response_headers_to_add_;
Http::RequestHeaderMap* request_headers_{};
// Holds the descriptors that should be applied on stream done which will be populated during
// decodeHeaders.
std::vector<Envoy::RateLimit::Descriptor> descriptors_apply_on_stream_done_{};
};

/**
* This implements the rate limit callback that outlives the filter holding the client.
* On completion, it deletes itself.
*/
class OnStreamDoneCallBack : public Filters::Common::RateLimit::RequestCallbacks {
public:
OnStreamDoneCallBack(Filters::Common::RateLimit::ClientPtr client) : client_(std::move(client)) {}
~OnStreamDoneCallBack() override = default;

// RateLimit::RequestCallbacks
void complete(Filters::Common::RateLimit::LimitStatus,
Filters::Common::RateLimit::DescriptorStatusListPtr&&, Http::ResponseHeaderMapPtr&&,
Http::RequestHeaderMapPtr&&, const std::string&,
Filters::Common::RateLimit::DynamicMetadataPtr&&) override;

Filters::Common::RateLimit::ClientPtr& client() { return client_; }
mathetake marked this conversation as resolved.
Show resolved Hide resolved

private:
Filters::Common::RateLimit::ClientPtr client_;
};

} // namespace RateLimitFilter
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/common/ratelimit/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class MockClient : public Client {
MOCK_METHOD(void, limit,
(RequestCallbacks & callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend));
};

Expand Down
23 changes: 22 additions & 1 deletion test/extensions/filters/http/ratelimit/ratelimit_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ TEST_F(HttpRateLimitFilterTest, OkResponseWithAdditionalHitsAddend) {

filter_callbacks_.stream_info_.filter_state_->setData(
"envoy.ratelimit.hits_addend", std::make_unique<StreamInfo::UInt32AccessorImpl>(5),
StreamInfo::FilterState::StateType::ReadOnly);
StreamInfo::FilterState::StateType::Mutable);
EXPECT_CALL(filter_callbacks_.route_->route_entry_.rate_limit_policy_, getApplicableRateLimit(0));

EXPECT_CALL(route_rate_limit_, populateDescriptors(_, _, _, _))
Expand All @@ -274,6 +274,11 @@ TEST_F(HttpRateLimitFilterTest, OkResponseWithAdditionalHitsAddend) {
EXPECT_CALL(filter_callbacks_.route_->virtual_host_.rate_limit_policy_,
getApplicableRateLimit(0));

// Make sure that descriptor_two_ will be applied on stream done.
EXPECT_CALL(vh_rate_limit_, applyOnStreamDone()).WillOnce(Return(true));
EXPECT_CALL(vh_rate_limit_, populateDescriptors(_, _, _, _))
.WillOnce(SetArgReferee<0>(descriptor_two_));

EXPECT_CALL(*client_, limit(_, "foo",
testing::ContainerEq(std::vector<RateLimit::Descriptor>{
{{{"descriptor_key", "descriptor_value"}}}}),
Expand Down Expand Up @@ -304,6 +309,22 @@ TEST_F(HttpRateLimitFilterTest, OkResponseWithAdditionalHitsAddend) {

EXPECT_EQ(
1U, filter_callbacks_.clusterInfo()->statsScope().counterFromStatName(ratelimit_ok_).value());

// Test the behavior when apply_on_stream_done is true.
testing::Mock::VerifyAndClearExpectations(client_);
filter_callbacks_.stream_info_.filter_state_->setData(
// Ensures that addend can be set differently than the request path.
"envoy.ratelimit.hits_addend", std::make_unique<StreamInfo::UInt32AccessorImpl>(100),
StreamInfo::FilterState::StateType::Mutable);
EXPECT_CALL(*client_, cancel());
EXPECT_CALL(*client_, limit(_, "foo", testing::ContainerEq(descriptor_two_), _, _, 100))
.WillOnce(
WithArgs<0>(Invoke([&](Filters::Common::RateLimit::RequestCallbacks& callbacks) -> void {
request_callbacks_ = &callbacks;
})));
filter_->onDestroy();
request_callbacks_->complete(Filters::Common::RateLimit::LimitStatus::OK, nullptr, nullptr,
nullptr, "", nullptr);
}

TEST_F(HttpRateLimitFilterTest, OkResponseWithHeaders) {
Expand Down
1 change: 1 addition & 0 deletions test/mocks/router/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ class MockRateLimitPolicyEntry : public RateLimitPolicyEntry {
const std::string& local_service_cluster, const Http::RequestHeaderMap& headers,
const StreamInfo::StreamInfo& info),
(const));
MOCK_METHOD(bool, applyOnStreamDone, (), (const));

uint64_t stage_{};
std::string disable_key_;
Expand Down
Loading