Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http ratelimit: option to reduce budget on stream done #37548

Merged
merged 30 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9deac5f
ratelimit: option to excute action on stream done
mathetake Dec 6, 2024
5fe0370
format
mathetake Dec 6, 2024
32ea3e9
more clarify
mathetake Dec 6, 2024
e346217
clarify more
mathetake Dec 7, 2024
c8e0b73
Apply feedback in offline: action is either request or response, not …
mathetake Dec 9, 2024
6166b90
Apply offline review: remove the mention of envoy.ratelimit.hits_adde…
mathetake Dec 9, 2024
cab0154
more comments
mathetake Dec 9, 2024
371e4c6
Put it in the correct place
mathetake Dec 9, 2024
7e77a7f
Filter config level
mathetake Dec 9, 2024
8f29563
mention envoy.ratelimit.hits_addend
mathetake Dec 11, 2024
a02e932
Merge remote-tracking branch 'origin/main' into actionapplyondone
mathetake Dec 11, 2024
6598d44
Add more comments
mathetake Dec 11, 2024
59c2d1d
impl
mathetake Dec 11, 2024
3047603
format
mathetake Dec 11, 2024
c812eec
adds tests
mathetake Dec 11, 2024
d4448c4
ok
mathetake Dec 12, 2024
7b9bd57
simplifies
mathetake Dec 12, 2024
011a816
more
mathetake Dec 12, 2024
57564f4
review: flag on descriptor/policy level
mathetake Dec 12, 2024
146af55
more
mathetake Dec 12, 2024
702e427
reviews: apply comments
mathetake Dec 17, 2024
5aaa9c1
Merge remote-tracking branch 'origin/main' into actionapplyondone
mathetake Dec 17, 2024
f3c7b32
review: get descriptors in done
mathetake Dec 18, 2024
74a6cc9
typo
mathetake Dec 18, 2024
0ba3b25
remove unnecessary change
mathetake Dec 18, 2024
8823652
Apply review comments
mathetake Dec 18, 2024
946ce6f
review: save route_ and per-route vh_rate_limits_ to for consistency
mathetake Dec 18, 2024
b100c6d
Merge remote-tracking branch 'origin/main' into actionapplyondone
mathetake Dec 18, 2024
5dec975
simplify
mathetake Dec 18, 2024
b76e3d7
apply review commments
mathetake Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/envoy/config/route/v3/route_components.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,7 @@ message VirtualCluster {

// Global rate limiting :ref:`architecture overview <arch_overview_global_rate_limit>`.
// Also applies to Local rate limiting :ref:`using descriptors <config_http_filters_local_rate_limit_descriptors>`.
// [#next-free-field: 6]
message RateLimit {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.route.RateLimit";

Expand Down Expand Up @@ -2193,6 +2194,23 @@ message RateLimit {
// from metadata, no override is provided. See :ref:`rate limit override
// <config_http_filters_rate_limit_rate_limit_override>` for more information.
Override limit = 4;

// If true, the rate limit request will be applied when the stream completes. The default value is false.
// This is useful when the rate limit budget needs to reflect the response context that is not available
// on the request path.
//
// For example, let's say the upstream service calculates the usage statistics and returns them in the response body
// and we want to utilize these numbers to apply the rate limit action for the subsequent requests.
// Combined with another filter that can set the desired addend based on the response (e.g. Lua filter),
// this can be used to subtract the usage statistics from the rate limit budget.
//
// A rate limit applied on the stream completion is "fire-and-forget" by nature, and rate limit is not enforced by this config.
// In other words, the current request won't be blocked when this is true, but the budget will be updated for the subsequent
// requests based on the action with this field set to true. Users should ensure that the rate limit is enforced by the actions
// applied on the request path, i.e. the ones with this field set to false.
//
// Currently, this is only supported by the HTTP global rate filter.
bool apply_on_stream_done = 5;
}

// .. attention::
Expand Down
5 changes: 5 additions & 0 deletions envoy/router/router_ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class RateLimitPolicyEntry {
*/
virtual const std::string& disableKey() const PURE;

/**
* @return true if this rate limit policy should be applied on stream done.
*/
virtual bool applyOnStreamDone() const PURE;

/**
* Potentially populate the descriptor array with new descriptors to query.
* @param descriptors supplies the descriptor array to optionally fill.
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/router_ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ RateLimitPolicyEntryImpl::RateLimitPolicyEntryImpl(
const envoy::config::route::v3::RateLimit& config,
Server::Configuration::CommonFactoryContext& context, absl::Status& creation_status)
: disable_key_(config.disable_key()),
stage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))) {
stage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, stage, 0))),
apply_on_stream_done_(config.apply_on_stream_done()) {
for (const auto& action : config.actions()) {
switch (action.action_specifier_case()) {
case envoy::config::route::v3::RateLimit::Action::ActionSpecifierCase::kSourceCluster:
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router_ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,14 @@ class RateLimitPolicyEntryImpl : public RateLimitPolicyEntry {
const std::string& local_service_cluster,
const Http::RequestHeaderMap&,
const StreamInfo::StreamInfo& info) const override;
bool applyOnStreamDone() const override { return apply_on_stream_done_; }

private:
const std::string disable_key_;
uint64_t stage_;
std::vector<RateLimit::DescriptorProducerPtr> actions_;
absl::optional<RateLimitOverrideActionPtr> limit_override_ = absl::nullopt;
const bool apply_on_stream_done_ = false;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/common/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Client {
*/
virtual void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend) PURE;
};

Expand Down
18 changes: 11 additions & 7 deletions source/extensions/filters/common/ratelimit/ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,19 @@ void GrpcClientImpl::createRequest(envoy::service::ratelimit::v3::RateLimitReque

void GrpcClientImpl::limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
uint32_t hits_addend) {
Tracing::Span& parent_span,
OptRef<const StreamInfo::StreamInfo> stream_info, uint32_t hits_addend) {
ASSERT(callbacks_ == nullptr);
callbacks_ = &callbacks;

envoy::service::ratelimit::v3::RateLimitRequest request;
createRequest(request, domain, descriptors, hits_addend);

request_ =
async_client_->send(service_method_, request, *this, parent_span,
Http::AsyncClient::RequestOptions().setTimeout(timeout_).setParentContext(
Http::AsyncClient::ParentContext{&stream_info}));
auto options = Http::AsyncClient::RequestOptions().setTimeout(timeout_);
if (stream_info.has_value()) {
options.setParentContext(Http::AsyncClient::ParentContext{stream_info.ptr()});
}
request_ = async_client_->send(service_method_, request, *this, parent_span, options);
}

void GrpcClientImpl::onSuccess(
Expand Down Expand Up @@ -118,8 +119,11 @@ void GrpcClientImpl::onFailure(Grpc::Status::GrpcStatus status, const std::strin
ASSERT(status != Grpc::Status::WellKnownGrpcStatus::Ok);
ENVOY_LOG_TO_LOGGER(Logger::Registry::getLog(Logger::Id::filter), debug,
"rate limit fail, status={} msg={}", status, msg);
callbacks_->complete(LimitStatus::Error, nullptr, nullptr, nullptr, EMPTY_STRING, nullptr);
// The rate limit requests applied on stream-done will destroy the client inside the complete
// callback, so we release the callback here to make the destructor happy.
auto call_backs = callbacks_;
callbacks_ = nullptr;
call_backs->complete(LimitStatus::Error, nullptr, nullptr, nullptr, EMPTY_STRING, nullptr);
mathetake marked this conversation as resolved.
Show resolved Hide resolved
}

ClientPtr rateLimitClient(Server::Configuration::FactoryContext& context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GrpcClientImpl : public Client,
void cancel() override;
void limit(RequestCallbacks& callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend = 0) override;

// Grpc::AsyncRequestCallbacks
Expand Down
68 changes: 50 additions & 18 deletions source/extensions/filters/http/ratelimit/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ void Filter::initiateCall(const Http::RequestHeaderMap& headers) {
return;
}

std::vector<Envoy::RateLimit::Descriptor> descriptors;
populateRateLimitDescriptors(descriptors, headers, false);
if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo(), getHitAddend());
initiating_call_ = false;
}
}

void Filter::populateRateLimitDescriptors(std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done) {
Router::RouteConstSharedPtr route = callbacks_->route();
if (!route || !route->routeEntry()) {
return;
Expand All @@ -65,42 +79,38 @@ void Filter::initiateCall(const Http::RequestHeaderMap& headers) {
return;
}

std::vector<Envoy::RateLimit::Descriptor> descriptors;

const Router::RouteEntry* route_entry = route->routeEntry();
// Get all applicable rate limit policy entries for the route.
populateRateLimitDescriptors(route_entry->rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route_entry->rateLimitPolicy(), descriptors, headers,
on_stream_done);

VhRateLimitOptions vh_rate_limit_option = getVirtualHostRateLimitOption(route);

switch (vh_rate_limit_option) {
case VhRateLimitOptions::Ignore:
break;
case VhRateLimitOptions::Include:
populateRateLimitDescriptors(route->virtualHost().rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route->virtualHost().rateLimitPolicy(), descriptors,
headers, on_stream_done);
break;
case VhRateLimitOptions::Override:
if (route_entry->rateLimitPolicy().empty()) {
populateRateLimitDescriptors(route->virtualHost().rateLimitPolicy(), descriptors, headers);
populateRateLimitDescriptorsForPolicy(route->virtualHost().rateLimitPolicy(), descriptors,
headers, on_stream_done);
}
break;
}
}

double Filter::getHitAddend() {
const StreamInfo::UInt32Accessor* hits_addend_filter_state =
callbacks_->streamInfo().filterState()->getDataReadOnly<StreamInfo::UInt32Accessor>(
HitsAddendFilterStateKey);
double hits_addend = 0;
if (hits_addend_filter_state != nullptr) {
hits_addend = hits_addend_filter_state->value();
}

if (!descriptors.empty()) {
state_ = State::Calling;
initiating_call_ = true;
client_->limit(*this, getDomain(), descriptors, callbacks_->activeSpan(),
callbacks_->streamInfo(), hits_addend);
initiating_call_ = false;
}
return hits_addend;
}

Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool) {
Expand Down Expand Up @@ -161,6 +171,16 @@ void Filter::onDestroy() {
if (state_ == State::Calling) {
state_ = State::Complete;
client_->cancel();
} else if (client_ != nullptr) {
std::vector<Envoy::RateLimit::Descriptor> descriptors;
populateRateLimitDescriptors(descriptors, *request_headers_, true);
if (!descriptors.empty()) {
// Since this filter is being destroyed, we need to keep the client alive until the request
// is complete by leaking the client with OnStreamDoneCallBack.
auto callback = new OnStreamDoneCallBack(std::move(client_));
callback->client().limit(*callback, getDomain(), descriptors, Tracing::NullSpan::instance(),
absl::nullopt, getHitAddend());
}
}
}

Expand Down Expand Up @@ -256,9 +276,10 @@ void Filter::complete(Filters::Common::RateLimit::LimitStatus status,
}
}

void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers) const {
void Filter::populateRateLimitDescriptorsForPolicy(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done) {
for (const Router::RateLimitPolicyEntry& rate_limit :
rate_limit_policy.getApplicableRateLimit(config_->stage())) {
const std::string& disable_key = rate_limit.disableKey();
Expand All @@ -267,8 +288,11 @@ void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_li
fmt::format("ratelimit.{}.http_filter_enabled", disable_key), 100)) {
continue;
}
rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
const bool apply_on_stream_done = rate_limit.applyOnStreamDone();
if (on_stream_done == apply_on_stream_done) {
rate_limit.populateDescriptors(descriptors, config_->localInfo().clusterName(), headers,
callbacks_->streamInfo());
}
mathetake marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -330,6 +354,14 @@ std::string Filter::getDomain() {
return config_->domain();
}

void OnStreamDoneCallBack::complete(Filters::Common::RateLimit::LimitStatus,
Filters::Common::RateLimit::DescriptorStatusListPtr&&,
Http::ResponseHeaderMapPtr&&, Http::RequestHeaderMapPtr&&,
const std::string&,
Filters::Common::RateLimit::DynamicMetadataPtr&&) {
delete this;
}

} // namespace RateLimitFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
31 changes: 28 additions & 3 deletions source/extensions/filters/http/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,15 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req

private:
void initiateCall(const Http::RequestHeaderMap& headers);
void populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers) const;
void populateRateLimitDescriptors(std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers, bool on_stream_done);
void populateRateLimitDescriptorsForPolicy(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Http::RequestHeaderMap& headers,
bool on_stream_done);
void populateResponseHeaders(Http::HeaderMap& response_headers, bool from_local_reply);
void appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add);
double getHitAddend();
VhRateLimitOptions getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route);
std::string getDomain();

Expand All @@ -208,6 +212,27 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req
Http::RequestHeaderMap* request_headers_{};
};

/**
* This implements the rate limit callback that outlives the filter holding the client.
* On completion, it deletes itself.
*/
class OnStreamDoneCallBack : public Filters::Common::RateLimit::RequestCallbacks {
public:
OnStreamDoneCallBack(Filters::Common::RateLimit::ClientPtr client) : client_(std::move(client)) {}
~OnStreamDoneCallBack() override = default;

// RateLimit::RequestCallbacks
void complete(Filters::Common::RateLimit::LimitStatus,
Filters::Common::RateLimit::DescriptorStatusListPtr&&, Http::ResponseHeaderMapPtr&&,
Http::RequestHeaderMapPtr&&, const std::string&,
Filters::Common::RateLimit::DynamicMetadataPtr&&) override;

Filters::Common::RateLimit::Client& client() { return *client_; }

private:
Filters::Common::RateLimit::ClientPtr client_;
};

} // namespace RateLimitFilter
} // namespace HttpFilters
} // namespace Extensions
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/common/ratelimit/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class MockClient : public Client {
MOCK_METHOD(void, limit,
(RequestCallbacks & callbacks, const std::string& domain,
const std::vector<Envoy::RateLimit::Descriptor>& descriptors,
Tracing::Span& parent_span, const StreamInfo::StreamInfo& stream_info,
Tracing::Span& parent_span, OptRef<const StreamInfo::StreamInfo> stream_info,
uint32_t hits_addend));
};

Expand Down
31 changes: 27 additions & 4 deletions test/extensions/filters/http/ratelimit/ratelimit_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,13 @@ TEST_F(HttpRateLimitFilterTest, OkResponseWithAdditionalHitsAddend) {

filter_callbacks_.stream_info_.filter_state_->setData(
"envoy.ratelimit.hits_addend", std::make_unique<StreamInfo::UInt32AccessorImpl>(5),
StreamInfo::FilterState::StateType::ReadOnly);
StreamInfo::FilterState::StateType::Mutable);
EXPECT_CALL(filter_callbacks_.route_->route_entry_.rate_limit_policy_, getApplicableRateLimit(0));

EXPECT_CALL(route_rate_limit_, populateDescriptors(_, _, _, _))
.WillOnce(SetArgReferee<0>(descriptor_));

EXPECT_CALL(filter_callbacks_.route_->virtual_host_.rate_limit_policy_,
getApplicableRateLimit(0));

EXPECT_CALL(vh_rate_limit_, applyOnStreamDone()).WillRepeatedly(Return(true));
EXPECT_CALL(*client_, limit(_, "foo",
testing::ContainerEq(std::vector<RateLimit::Descriptor>{
{{{"descriptor_key", "descriptor_value"}}}}),
Expand Down Expand Up @@ -304,6 +302,31 @@ TEST_F(HttpRateLimitFilterTest, OkResponseWithAdditionalHitsAddend) {

EXPECT_EQ(
1U, filter_callbacks_.clusterInfo()->statsScope().counterFromStatName(ratelimit_ok_).value());

// Test the behavior for the apply_on_stream_done flag.
testing::Mock::VerifyAndClearExpectations(client_);
testing::Mock::VerifyAndClearExpectations(&filter_callbacks_);
testing::Mock::VerifyAndClearExpectations(
&filter_callbacks_.route_->route_entry_.rate_limit_policy_);
testing::Mock::VerifyAndClearExpectations(&route_rate_limit_);
testing::Mock::VerifyAndClearExpectations(&vh_rate_limit_);
filter_callbacks_.stream_info_.filter_state_->setData(
// Ensures that addend can be set differently than the request path.
"envoy.ratelimit.hits_addend", std::make_unique<StreamInfo::UInt32AccessorImpl>(100),
StreamInfo::FilterState::StateType::Mutable);
EXPECT_CALL(filter_callbacks_.route_->route_entry_.rate_limit_policy_, getApplicableRateLimit(0));
EXPECT_CALL(vh_rate_limit_, applyOnStreamDone()).WillRepeatedly(Return(true));
EXPECT_CALL(vh_rate_limit_, populateDescriptors(_, _, _, _))
.WillOnce(SetArgReferee<0>(descriptor_two_));
EXPECT_CALL(*client_, cancel());
EXPECT_CALL(*client_, limit(_, "foo", testing::ContainerEq(descriptor_two_), _, _, 100))
.WillOnce(
WithArgs<0>(Invoke([&](Filters::Common::RateLimit::RequestCallbacks& callbacks) -> void {
request_callbacks_ = &callbacks;
})));
filter_->onDestroy();
request_callbacks_->complete(Filters::Common::RateLimit::LimitStatus::OK, nullptr, nullptr,
nullptr, "", nullptr);
}

TEST_F(HttpRateLimitFilterTest, OkResponseWithHeaders) {
Expand Down
1 change: 1 addition & 0 deletions test/mocks/router/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ class MockRateLimitPolicyEntry : public RateLimitPolicyEntry {
const std::string& local_service_cluster, const Http::RequestHeaderMap& headers,
const StreamInfo::StreamInfo& info),
(const));
MOCK_METHOD(bool, applyOnStreamDone, (), (const));

uint64_t stage_{};
std::string disable_key_;
Expand Down
Loading