diff --git a/CODEOWNERS b/CODEOWNERS index a252439569d6..9576cbb5a308 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -44,5 +44,7 @@ extensions/filters/common/original_src @snowp @klarose /*/extensions/filters/http/dynamic_forward_proxy @mattklein123 @alyssawilk # omit_canary_hosts retry predicate /*/extensions/retry/host/omit_canary_hosts @sriduth @snowp +# adaptive concurrency limit extension. +/*/extensions/filters/http/adaptive_concurrency @tonya11en @mattklein123 # http inspector /*/extensions/filters/listener/http_inspector @crazyxy @PiotrSikora @lizan diff --git a/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/BUILD b/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/BUILD new file mode 100644 index 000000000000..948ceec2223d --- /dev/null +++ b/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/BUILD @@ -0,0 +1,11 @@ +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_library_internal") + +licenses(["notice"]) # Apache 2 + +api_proto_library_internal( + name = "adaptive_concurrency", + srcs = ["adaptive_concurrency.proto"], + deps = [ + "//envoy/api/v2/core:base", + ], +) diff --git a/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.proto b/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.proto new file mode 100644 index 000000000000..ff19657260e8 --- /dev/null +++ b/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package envoy.config.filter.http.adaptive_concurrency.v2alpha; + +option java_package = "io.envoyproxy.envoy.config.filter.http.adaptive_concurrency.v2alpha"; +option java_outer_classname = "AdaptiveConcurrencyProto"; +option java_multiple_files = true; +option go_package = "v2alpha"; + +message AdaptiveConcurrency { +} diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index c18d6d6fa47e..b96c047dab66 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -29,6 +29,9 @@ EXTENSIONS = { # HTTP filters # + # NOTE: The adaptive concurrency filter does not have a proper filter + # implemented right now. We are just referencing the filter lib here. + "envoy.filters.http.adaptive_concurrency": "//source/extensions/filters/http/adaptive_concurrency:adaptive_concurrency_filter_lib", "envoy.filters.http.buffer": "//source/extensions/filters/http/buffer:config", "envoy.filters.http.cors": "//source/extensions/filters/http/cors:config", "envoy.filters.http.csrf": "//source/extensions/filters/http/csrf:config", @@ -37,9 +40,9 @@ EXTENSIONS = { "envoy.filters.http.ext_authz": "//source/extensions/filters/http/ext_authz:config", "envoy.filters.http.fault": "//source/extensions/filters/http/fault:config", "envoy.filters.http.grpc_http1_bridge": "//source/extensions/filters/http/grpc_http1_bridge:config", + "envoy.filters.http.grpc_http1_reverse_bridge": "//source/extensions/filters/http/grpc_http1_reverse_bridge:config", "envoy.filters.http.grpc_json_transcoder": "//source/extensions/filters/http/grpc_json_transcoder:config", "envoy.filters.http.grpc_web": "//source/extensions/filters/http/grpc_web:config", - "envoy.filters.http.grpc_http1_reverse_bridge": "//source/extensions/filters/http/grpc_http1_reverse_bridge:config", "envoy.filters.http.gzip": "//source/extensions/filters/http/gzip:config", "envoy.filters.http.header_to_metadata": "//source/extensions/filters/http/header_to_metadata:config", "envoy.filters.http.health_check": "//source/extensions/filters/http/health_check:config", diff --git a/source/extensions/filters/http/adaptive_concurrency/BUILD b/source/extensions/filters/http/adaptive_concurrency/BUILD new file mode 100644 index 000000000000..5d86773ebf8c --- /dev/null +++ b/source/extensions/filters/http/adaptive_concurrency/BUILD @@ -0,0 +1,26 @@ +licenses(["notice"]) # Apache 2 + +# HTTP L7 filter that dynamically adjusts the number of allowed concurrent +# requests based on sampled latencies. +# Public docs: TODO (tonya11en) + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +envoy_package() + +envoy_cc_library( + name = "adaptive_concurrency_filter_lib", + srcs = ["adaptive_concurrency_filter.cc"], + hdrs = ["adaptive_concurrency_filter.h"], + deps = [ + "//include/envoy/http:filter_interface", + "//source/extensions/filters/http:well_known_names", + "//source/extensions/filters/http/adaptive_concurrency/concurrency_controller:concurrency_controller_lib", + "//source/extensions/filters/http/common:pass_through_filter_lib", + "@envoy_api//envoy/config/filter/http/adaptive_concurrency/v2alpha:adaptive_concurrency_cc", + ], +) diff --git a/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.cc b/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.cc new file mode 100644 index 000000000000..1ec4dd8247e2 --- /dev/null +++ b/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.cc @@ -0,0 +1,47 @@ +#include "extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h" + +#include +#include +#include +#include + +#include "common/common/assert.h" + +#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h" +#include "extensions/filters/http/well_known_names.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdaptiveConcurrency { + +AdaptiveConcurrencyFilterConfig::AdaptiveConcurrencyFilterConfig( + const envoy::config::filter::http::adaptive_concurrency::v2alpha::AdaptiveConcurrency&, + Runtime::Loader&, std::string stats_prefix, Stats::Scope&, TimeSource& time_source) + : stats_prefix_(std::move(stats_prefix)), time_source_(time_source) {} + +AdaptiveConcurrencyFilter::AdaptiveConcurrencyFilter( + AdaptiveConcurrencyFilterConfigSharedPtr config, ConcurrencyControllerSharedPtr controller) + : config_(std::move(config)), controller_(std::move(controller)) {} + +Http::FilterHeadersStatus AdaptiveConcurrencyFilter::decodeHeaders(Http::HeaderMap&, bool) { + if (controller_->forwardingDecision() == ConcurrencyController::RequestForwardingAction::Block) { + // TODO (tonya11en): Remove filler words. + decoder_callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "filler words", nullptr, + absl::nullopt, "more filler words"); + return Http::FilterHeadersStatus::StopIteration; + } + + rq_start_time_ = config_->timeSource().monotonicTime(); + return Http::FilterHeadersStatus::Continue; +} + +void AdaptiveConcurrencyFilter::encodeComplete() { + const auto rq_latency = config_->timeSource().monotonicTime() - rq_start_time_; + controller_->recordLatencySample(rq_latency); +} + +} // namespace AdaptiveConcurrency +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h b/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h new file mode 100644 index 000000000000..88070180272b --- /dev/null +++ b/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h @@ -0,0 +1,71 @@ +#pragma once + +#include +#include +#include + +#include "envoy/common/time.h" +#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.h" +#include "envoy/http/filter.h" +#include "envoy/runtime/runtime.h" +#include "envoy/stats/scope.h" +#include "envoy/stats/stats_macros.h" + +#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h" +#include "extensions/filters/http/common/pass_through_filter.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdaptiveConcurrency { + +/** + * Configuration for the adaptive concurrency limit filter. + */ +class AdaptiveConcurrencyFilterConfig { +public: + AdaptiveConcurrencyFilterConfig( + const envoy::config::filter::http::adaptive_concurrency::v2alpha::AdaptiveConcurrency& + adaptive_concurrency, + Runtime::Loader& runtime, std::string stats_prefix, Stats::Scope& scope, + TimeSource& time_source); + + TimeSource& timeSource() const { return time_source_; } + +private: + const std::string stats_prefix_; + TimeSource& time_source_; +}; + +using AdaptiveConcurrencyFilterConfigSharedPtr = + std::shared_ptr; +using ConcurrencyControllerSharedPtr = + std::shared_ptr; + +/** + * A filter that samples request latencies and dynamically adjusts the request + * concurrency window. + */ +class AdaptiveConcurrencyFilter : public Http::PassThroughFilter, + Logger::Loggable { +public: + AdaptiveConcurrencyFilter(AdaptiveConcurrencyFilterConfigSharedPtr config, + ConcurrencyControllerSharedPtr controller); + + // Http::StreamDecoderFilter + Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap&, bool) override; + + // Http::StreamEncoderFilter + void encodeComplete() override; + +private: + AdaptiveConcurrencyFilterConfigSharedPtr config_; + const ConcurrencyControllerSharedPtr controller_; + MonotonicTime rq_start_time_; + std::unique_ptr forwarding_action_; +}; + +} // namespace AdaptiveConcurrency +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/BUILD b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/BUILD new file mode 100644 index 000000000000..d213690d63c6 --- /dev/null +++ b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/BUILD @@ -0,0 +1,23 @@ +licenses(["notice"]) # Apache 2 + +# HTTP L7 filter that dynamically adjusts the number of allowed concurrent +# requests based on sampled latencies. +# Public docs: TODO (tonya11en) + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +envoy_package() + +envoy_cc_library( + name = "concurrency_controller_lib", + srcs = [], + hdrs = [ + "concurrency_controller.h", + ], + deps = [ + ], +) diff --git a/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h new file mode 100644 index 000000000000..0c0dbe456c7d --- /dev/null +++ b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h @@ -0,0 +1,53 @@ +#pragma once + +#include + +#include "envoy/common/pure.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdaptiveConcurrency { +namespace ConcurrencyController { + +/** + * The controller's decision on whether a request will be forwarded. + */ +enum class RequestForwardingAction { + // The concurrency limit is exceeded, so the request cannot be forwarded. + Block, + + // The controller has allowed the request through and changed its internal + // state. The request must be forwarded. + Forward +}; + +/** + * Adaptive concurrency controller interface. All implementations of this + * interface must be thread-safe. + */ +class ConcurrencyController { +public: + virtual ~ConcurrencyController() = default; + + /** + * Called during decoding when the adaptive concurrency filter is attempting + * to forward a request. Returns its decision on whether to forward a request. + */ + virtual RequestForwardingAction forwardingDecision() PURE; + + /** + * Called during encoding when the request latency is known. Records the + * request latency to update the internal state of the controller for + * concurrency limit calculations. + * + * @param rq_latency is the clocked round-trip time for the request. + */ + virtual void recordLatencySample(const std::chrono::nanoseconds& rq_latency) PURE; +}; + +} // namespace ConcurrencyController +} // namespace AdaptiveConcurrency +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/well_known_names.h b/source/extensions/filters/http/well_known_names.h index 3ce6643fa8fc..82341e13739f 100644 --- a/source/extensions/filters/http/well_known_names.h +++ b/source/extensions/filters/http/well_known_names.h @@ -54,6 +54,8 @@ class HttpFilterNameValues { const std::string HeaderToMetadata = "envoy.filters.http.header_to_metadata"; // Tap filter const std::string Tap = "envoy.filters.http.tap"; + // Adaptive concurrency limit filter + const std::string AdaptiveConcurrency = "envoy.filters.http.adaptive_concurrency"; // Original Src Filter const std::string OriginalSrc = "envoy.filters.http.original_src"; // Dynamic forward proxy filter diff --git a/test/extensions/filters/http/adaptive_concurrency/BUILD b/test/extensions/filters/http/adaptive_concurrency/BUILD new file mode 100644 index 000000000000..14aa0a108be2 --- /dev/null +++ b/test/extensions/filters/http/adaptive_concurrency/BUILD @@ -0,0 +1,28 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test_library", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +envoy_package() + +envoy_extension_cc_test( + name = "adaptive_concurrency_filter_test", + srcs = ["adaptive_concurrency_filter_test.cc"], + extension_name = "envoy.filters.http.adaptive_concurrency", + deps = [ + "//source/common/http:header_map_lib", + "//source/common/http:headers_lib", + "//source/extensions/filters/http/adaptive_concurrency:adaptive_concurrency_filter_lib", + "//source/extensions/filters/http/adaptive_concurrency/concurrency_controller:concurrency_controller_lib", + "//test/mocks/http:http_mocks", + "//test/test_common:simulated_time_system_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/test/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter_test.cc b/test/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter_test.cc new file mode 100644 index 000000000000..60ad871dc3e5 --- /dev/null +++ b/test/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter_test.cc @@ -0,0 +1,98 @@ +#include + +#include "extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h" +#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h" + +#include "test/mocks/http/mocks.h" +#include "test/mocks/stream_info/mocks.h" +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::Return; + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdaptiveConcurrency { +namespace { + +using ConcurrencyController::RequestForwardingAction; + +class MockConcurrencyController : public ConcurrencyController::ConcurrencyController { +public: + MOCK_METHOD0(forwardingDecision, RequestForwardingAction()); + MOCK_METHOD1(recordLatencySample, void(const std::chrono::nanoseconds&)); +}; + +class AdaptiveConcurrencyFilterTest : public testing::Test { +public: + AdaptiveConcurrencyFilterTest() { + filter_.reset(); + + const envoy::config::filter::http::adaptive_concurrency::v2alpha::AdaptiveConcurrency config; + auto config_ptr = std::make_shared( + config, runtime_, "testprefix.", stats_, time_system_); + + filter_ = std::make_unique(config_ptr, controller_); + filter_->setDecoderFilterCallbacks(decoder_callbacks_); + } + + std::unique_ptr filter_; + Event::SimulatedTimeSystem time_system_; + Stats::IsolatedStoreImpl stats_; + NiceMock runtime_; + std::shared_ptr controller_{new MockConcurrencyController()}; + NiceMock decoder_callbacks_; +}; + +TEST_F(AdaptiveConcurrencyFilterTest, DecodeHeadersTestForwarding) { + Http::TestHeaderMapImpl request_headers; + + EXPECT_CALL(*controller_, forwardingDecision()) + .WillOnce(Return(RequestForwardingAction::Forward)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers, false)); + + Buffer::OwnedImpl request_body; + EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->decodeData(request_body, false)); + + Http::TestHeaderMapImpl request_trailers; + EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers)); +} + +TEST_F(AdaptiveConcurrencyFilterTest, DecodeHeadersTestBlock) { + Http::TestHeaderMapImpl request_headers; + + EXPECT_CALL(*controller_, forwardingDecision()).WillOnce(Return(RequestForwardingAction::Block)); + EXPECT_CALL(decoder_callbacks_, sendLocalReply(Http::Code::ServiceUnavailable, _, _, _, _)); + EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + filter_->decodeHeaders(request_headers, true)); +} + +TEST_F(AdaptiveConcurrencyFilterTest, EncodeHeadersValidTest) { + auto mt = time_system_.monotonicTime(); + time_system_.setMonotonicTime(mt + std::chrono::milliseconds(123)); + + // Get the filter to record the request start time via decode. + Http::TestHeaderMapImpl request_headers; + EXPECT_CALL(*controller_, forwardingDecision()) + .WillOnce(Return(RequestForwardingAction::Forward)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers, true)); + + const std::chrono::nanoseconds advance_time = std::chrono::milliseconds(42); + mt = time_system_.monotonicTime(); + time_system_.setMonotonicTime(mt + advance_time); + + Http::TestHeaderMapImpl response_headers; + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers, false)); + EXPECT_CALL(*controller_, recordLatencySample(advance_time)); + filter_->encodeComplete(); +} + +} // namespace +} // namespace AdaptiveConcurrency +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy