diff --git a/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/BUILD b/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/BUILD index b58f88c787ba..a02fc542756c 100644 --- a/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/BUILD +++ b/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/BUILD @@ -3,13 +3,17 @@ load("@envoy_api//bazel:api_build_system.bzl", "api_proto_library_internal", "ap licenses(["notice"]) # Apache 2 api_proto_package( - deps = ["//envoy/api/v2/core"], + deps = [ + "//envoy/api/v3alpha/core", + "//envoy/type", + ], ) api_proto_library_internal( name = "adaptive_concurrency", srcs = ["adaptive_concurrency.proto"], deps = [ - "//envoy/api/v2/core:base", + "//envoy/api/v3alpha/core:base", + "//envoy/type:percent", ], ) diff --git a/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.proto b/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.proto index 303b681471f4..9b03169f7dd0 100644 --- a/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.proto +++ b/api/envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.proto @@ -6,5 +6,59 @@ option java_package = "io.envoyproxy.envoy.config.filter.http.adaptive_concurren option java_outer_classname = "AdaptiveConcurrencyProto"; option java_multiple_files = true; +import "envoy/type/percent.proto"; + +import "google/protobuf/duration.proto"; +import "google/api/annotations.proto"; +import "google/protobuf/wrappers.proto"; + +import "validate/validate.proto"; + +// Configuration parameters for the gradient controller. +message GradientControllerConfig { + // The percentile to use when summarizing aggregated samples. Defaults to p50. + envoy.type.Percent sample_aggregate_percentile = 1; + + // Parameters controlling the periodic recalculation of the concurrency limit from sampled request + // latencies. + message ConcurrencyLimitCalculationParams { + // The maximum value the gradient is allowed to take. This influences how aggressively the + // concurrency limit can increase. Defaults to 2.0. + google.protobuf.DoubleValue max_gradient = 1 [(validate.rules).double.gt = 1.0]; + + // The allowed upper-bound on the calculated concurrency limit. Defaults to 1000. + google.protobuf.UInt32Value max_concurrency_limit = 2 [(validate.rules).uint32.gt = 0]; + + // The period of time samples are taken to recalculate the concurrency limit. + google.protobuf.Duration concurrency_update_interval = 3 [(validate.rules).duration = { + required: true, + gt: {seconds: 0} + }]; + } + ConcurrencyLimitCalculationParams concurrency_limit_params = 2 + [(validate.rules).message.required = true]; + + // Parameters controlling the periodic minRTT recalculation. + message MinimumRTTCalculationParams { + // The time interval between recalculating the minimum request round-trip time. + google.protobuf.Duration interval = 1 [(validate.rules).duration = { + required: true, + gt: {seconds: 0} + }]; + + // The number of requests to aggregate/sample during the minRTT recalculation window before + // updating. Defaults to 50. + google.protobuf.UInt32Value request_count = 2 [(validate.rules).uint32.gt = 0]; + }; + MinimumRTTCalculationParams min_rtt_calc_params = 3 [(validate.rules).message.required = true]; +} + message AdaptiveConcurrency { + oneof concurrency_controller_config { + option (validate.required) = true; + + // Gradient concurrency control will be used. + GradientControllerConfig gradient_controller_config = 1 + [(validate.rules).message.required = true]; + } } diff --git a/api/envoy/config/filter/http/adaptive_concurrency/v3alpha/BUILD b/api/envoy/config/filter/http/adaptive_concurrency/v3alpha/BUILD deleted file mode 100644 index f9813a6a0829..000000000000 --- a/api/envoy/config/filter/http/adaptive_concurrency/v3alpha/BUILD +++ /dev/null @@ -1,15 +0,0 @@ -load("@envoy_api//bazel:api_build_system.bzl", "api_proto_library_internal", "api_proto_package") - -licenses(["notice"]) # Apache 2 - -api_proto_package( - deps = ["//envoy/api/v3alpha/core"], -) - -api_proto_library_internal( - name = "adaptive_concurrency", - srcs = ["adaptive_concurrency.proto"], - deps = [ - "//envoy/api/v3alpha/core:base", - ], -) diff --git a/api/envoy/config/filter/http/adaptive_concurrency/v3alpha/adaptive_concurrency.proto b/api/envoy/config/filter/http/adaptive_concurrency/v3alpha/adaptive_concurrency.proto deleted file mode 100644 index 3d57196f9db7..000000000000 --- a/api/envoy/config/filter/http/adaptive_concurrency/v3alpha/adaptive_concurrency.proto +++ /dev/null @@ -1,10 +0,0 @@ -syntax = "proto3"; - -package envoy.config.filter.http.adaptive_concurrency.v3alpha; - -option java_package = "io.envoyproxy.envoy.config.filter.http.adaptive_concurrency.v3alpha"; -option java_outer_classname = "AdaptiveConcurrencyProto"; -option java_multiple_files = true; - -message AdaptiveConcurrency { -} diff --git a/source/common/common/cleanup.h b/source/common/common/cleanup.h index e7039ef069ce..1eafa29d44d6 100644 --- a/source/common/common/cleanup.h +++ b/source/common/common/cleanup.h @@ -10,11 +10,19 @@ namespace Envoy { // RAII cleanup via functor. class Cleanup { public: - Cleanup(std::function f) : f_(std::move(f)) {} + Cleanup(std::function f) : f_(std::move(f)), cancelled_(false) {} ~Cleanup() { f_(); } + void cancel() { + cancelled_ = true; + f_ = []() {}; + } + + bool cancelled() { return cancelled_; } + private: std::function f_; + bool cancelled_; }; // RAII helper class to add an element to an std::list on construction and erase diff --git a/source/common/protobuf/utility.h b/source/common/protobuf/utility.h index 00ae4bceb66c..1f29ea1d7921 100644 --- a/source/common/protobuf/utility.h +++ b/source/common/protobuf/utility.h @@ -84,6 +84,15 @@ uint64_t fractionalPercentDenominatorToInt( } // namespace ProtobufPercentHelper } // namespace Envoy +// Convert an envoy::api::v2::core::Percent to a double or a default. +// @param message supplies the proto message containing the field. +// @param field_name supplies the field name in the message. +// @param default_value supplies the default if the field is not present. +#define PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(message, field_name, default_value) \ + (!std::isnan((message).field_name().value()) \ + ? (message).has_##field_name() ? (message).field_name().value() : default_value \ + : throw EnvoyException(fmt::format("Value not in the range of 0..100 range."))) + // Convert an envoy::api::v2::core::Percent to a rounded integer or a default. // @param message supplies the proto message containing the field. // @param field_name supplies the field name in the message. diff --git a/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.cc b/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.cc index 1ec4dd8247e2..076ff9c57b60 100644 --- a/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.cc +++ b/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.cc @@ -32,13 +32,35 @@ Http::FilterHeadersStatus AdaptiveConcurrencyFilter::decodeHeaders(Http::HeaderM return Http::FilterHeadersStatus::StopIteration; } - rq_start_time_ = config_->timeSource().monotonicTime(); + // When the deferred_sample_task_ object is destroyed, the time difference between its destruction + // and the request start time is measured as the request latency. This value is sampled by the + // concurrency controller either when encoding is complete or during destruction of this filter + // object. + deferred_sample_task_ = + std::make_unique([this, rq_start_time = config_->timeSource().monotonicTime()]() { + const auto now = config_->timeSource().monotonicTime(); + const std::chrono::nanoseconds rq_latency = now - rq_start_time; + controller_->recordLatencySample(rq_latency); + }); + return Http::FilterHeadersStatus::Continue; } void AdaptiveConcurrencyFilter::encodeComplete() { - const auto rq_latency = config_->timeSource().monotonicTime() - rq_start_time_; - controller_->recordLatencySample(rq_latency); + ASSERT(deferred_sample_task_); + deferred_sample_task_.reset(); +} + +void AdaptiveConcurrencyFilter::onDestroy() { + if (deferred_sample_task_) { + // The sampling task hasn't been destroyed yet, so this implies we did not complete encoding. + // Let's stop the sampling from happening and perform request cleanup inside the controller. + // + // TODO (tonya11en): Return some RAII handle from the concurrency controller that performs this + // logic as part of its lifecycle. + deferred_sample_task_->cancel(); + controller_->cancelLatencySample(); + } } } // namespace AdaptiveConcurrency diff --git a/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h b/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h index 88070180272b..0ebf7479b008 100644 --- a/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h +++ b/source/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h @@ -11,6 +11,8 @@ #include "envoy/stats/scope.h" #include "envoy/stats/stats_macros.h" +#include "common/common/cleanup.h" + #include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h" #include "extensions/filters/http/common/pass_through_filter.h" @@ -57,12 +59,12 @@ class AdaptiveConcurrencyFilter : public Http::PassThroughFilter, // Http::StreamEncoderFilter void encodeComplete() override; + void onDestroy() override; private: AdaptiveConcurrencyFilterConfigSharedPtr config_; const ConcurrencyControllerSharedPtr controller_; - MonotonicTime rq_start_time_; - std::unique_ptr forwarding_action_; + std::unique_ptr deferred_sample_task_; }; } // namespace AdaptiveConcurrency diff --git a/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/BUILD b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/BUILD index d213690d63c6..604221865c11 100644 --- a/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/BUILD +++ b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/BUILD @@ -14,10 +14,20 @@ envoy_package() envoy_cc_library( name = "concurrency_controller_lib", - srcs = [], + srcs = ["gradient_controller.cc"], hdrs = [ "concurrency_controller.h", + "gradient_controller.h", + ], + external_deps = [ + "libcircllhist", ], deps = [ + "//source/common/event:dispatcher_lib", + "//source/common/protobuf", + "//source/common/runtime:runtime_lib", + "//source/common/stats:isolated_store_lib", + "//source/common/stats:stats_lib", + "@envoy_api//envoy/config/filter/http/adaptive_concurrency/v2alpha:adaptive_concurrency_cc", ], ) diff --git a/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h index 0c0dbe456c7d..20342c0bd6cf 100644 --- a/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h +++ b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h @@ -43,7 +43,18 @@ class ConcurrencyController { * * @param rq_latency is the clocked round-trip time for the request. */ - virtual void recordLatencySample(const std::chrono::nanoseconds& rq_latency) PURE; + virtual void recordLatencySample(std::chrono::nanoseconds rq_latency) PURE; + + /** + * Omit sampling an outstanding request and update the internal state of the controller to reflect + * request completion. + */ + virtual void cancelLatencySample() PURE; + + /** + * Returns the current concurrency limit. + */ + virtual uint32_t concurrencyLimit() const PURE; }; } // namespace ConcurrencyController diff --git a/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller.cc b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller.cc new file mode 100644 index 000000000000..3391c55fb6c3 --- /dev/null +++ b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller.cc @@ -0,0 +1,186 @@ +#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller.h" + +#include +#include + +#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.h" +#include "envoy/event/dispatcher.h" +#include "envoy/runtime/runtime.h" +#include "envoy/stats/stats.h" + +#include "common/common/cleanup.h" +#include "common/protobuf/protobuf.h" +#include "common/protobuf/utility.h" + +#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h" + +#include "absl/synchronization/mutex.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdaptiveConcurrency { +namespace ConcurrencyController { + +GradientControllerConfig::GradientControllerConfig( + const envoy::config::filter::http::adaptive_concurrency::v2alpha::GradientControllerConfig& + proto_config) + : min_rtt_calc_interval_(std::chrono::milliseconds( + DurationUtil::durationToMilliseconds(proto_config.min_rtt_calc_params().interval()))), + sample_rtt_calc_interval_(std::chrono::milliseconds(DurationUtil::durationToMilliseconds( + proto_config.concurrency_limit_params().concurrency_update_interval()))), + max_concurrency_limit_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( + proto_config.concurrency_limit_params(), max_concurrency_limit, 1000)), + min_rtt_aggregate_request_count_( + PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.min_rtt_calc_params(), request_count, 50)), + max_gradient_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.concurrency_limit_params(), + max_gradient, 2.0)), + sample_aggregate_percentile_( + PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config, sample_aggregate_percentile, 50) / + 100.0) {} + +GradientController::GradientController(GradientControllerConfigSharedPtr config, + Event::Dispatcher& dispatcher, Runtime::Loader&, + const std::string& stats_prefix, Stats::Scope& scope) + : config_(std::move(config)), dispatcher_(dispatcher), scope_(scope), + stats_(generateStats(scope_, stats_prefix)), deferred_limit_value_(1), num_rq_outstanding_(0), + concurrency_limit_(1), latency_sample_hist_(hist_fast_alloc(), hist_free) { + min_rtt_calc_timer_ = dispatcher_.createTimer([this]() -> void { enterMinRTTSamplingWindow(); }); + + sample_reset_timer_ = dispatcher_.createTimer([this]() -> void { + if (inMinRTTSamplingWindow()) { + // The minRTT sampling window started since the sample reset timer was enabled last. Since the + // minRTT value is being calculated, let's give up on this timer to avoid blocking the + // dispatcher thread and rely on it being enabled again as part of the minRTT calculation. + return; + } + + { + absl::MutexLock ml(&sample_mutation_mtx_); + resetSampleWindow(); + } + + sample_reset_timer_->enableTimer(config_->sampleRTTCalcInterval()); + }); + + sample_reset_timer_->enableTimer(config_->sampleRTTCalcInterval()); + stats_.concurrency_limit_.set(concurrency_limit_.load()); +} + +GradientControllerStats GradientController::generateStats(Stats::Scope& scope, + const std::string& stats_prefix) { + return {ALL_GRADIENT_CONTROLLER_STATS(POOL_GAUGE_PREFIX(scope, stats_prefix))}; +} + +void GradientController::enterMinRTTSamplingWindow() { + absl::MutexLock ml(&sample_mutation_mtx_); + + // Set the minRTT flag to indicate we're gathering samples to update the value. This will + // prevent the sample window from resetting until enough requests are gathered to complete the + // recalculation. + deferred_limit_value_.store(concurrencyLimit()); + updateConcurrencyLimit(1); + + // Throw away any latency samples from before the recalculation window as it may not represent + // the minRTT. + hist_clear(latency_sample_hist_.get()); +} + +void GradientController::updateMinRTT() { + ASSERT(inMinRTTSamplingWindow()); + + { + absl::MutexLock ml(&sample_mutation_mtx_); + min_rtt_ = processLatencySamplesAndClear(); + stats_.min_rtt_msecs_.set( + std::chrono::duration_cast(min_rtt_).count()); + updateConcurrencyLimit(deferred_limit_value_.load()); + deferred_limit_value_.store(0); + } + + min_rtt_calc_timer_->enableTimer(config_->minRTTCalcInterval()); +} + +void GradientController::resetSampleWindow() { + // The sampling window must not be reset while sampling for the new minRTT value. + ASSERT(!inMinRTTSamplingWindow()); + + if (hist_sample_count(latency_sample_hist_.get()) == 0) { + return; + } + + sample_rtt_ = processLatencySamplesAndClear(); + updateConcurrencyLimit(calculateNewLimit()); +} + +std::chrono::microseconds GradientController::processLatencySamplesAndClear() { + const std::array quantile{config_->sampleAggregatePercentile()}; + std::array calculated_quantile; + hist_approx_quantile(latency_sample_hist_.get(), quantile.data(), 1, calculated_quantile.data()); + hist_clear(latency_sample_hist_.get()); + return std::chrono::microseconds(static_cast(calculated_quantile[0])); +} + +uint32_t GradientController::calculateNewLimit() { + // Calculate the gradient value, ensuring it remains below the configured maximum. + ASSERT(sample_rtt_.count() > 0); + const double raw_gradient = static_cast(min_rtt_.count()) / sample_rtt_.count(); + const double gradient = std::min(config_->maxGradient(), raw_gradient); + stats_.gradient_.set(gradient); + + const double limit = concurrencyLimit() * gradient; + const double burst_headroom = sqrt(limit); + stats_.burst_queue_size_.set(burst_headroom); + + // The final concurrency value factors in the burst headroom and must be clamped to keep the value + // in the range [1, configured_max]. + const auto clamp = [](int min, int max, int val) { return std::max(min, std::min(max, val)); }; + const uint32_t new_limit = limit + burst_headroom; + return clamp(1, config_->maxConcurrencyLimit(), new_limit); +} + +RequestForwardingAction GradientController::forwardingDecision() { + // Note that a race condition exists here which would allow more outstanding requests than the + // concurrency limit bounded by the number of worker threads. After loading num_rq_outstanding_ + // and before loading concurrency_limit_, another thread could potentially swoop in and modify + // num_rq_outstanding_, causing us to move forward with stale values and increment + // num_rq_outstanding_. + // + // TODO (tonya11en): Reconsider using a CAS loop here. + if (num_rq_outstanding_.load() < concurrencyLimit()) { + ++num_rq_outstanding_; + return RequestForwardingAction::Forward; + } + return RequestForwardingAction::Block; +} + +void GradientController::recordLatencySample(std::chrono::nanoseconds rq_latency) { + const uint32_t latency_usec = + std::chrono::duration_cast(rq_latency).count(); + ASSERT(num_rq_outstanding_.load() > 0); + --num_rq_outstanding_; + + uint32_t sample_count; + { + absl::MutexLock ml(&sample_mutation_mtx_); + hist_insert(latency_sample_hist_.get(), latency_usec, 1); + sample_count = hist_sample_count(latency_sample_hist_.get()); + } + + if (inMinRTTSamplingWindow() && sample_count >= config_->minRTTAggregateRequestCount()) { + // This sample has pushed the request count over the request count requirement for the minRTT + // recalculation. It must now be finished. + updateMinRTT(); + } +} + +void GradientController::cancelLatencySample() { + ASSERT(num_rq_outstanding_.load() > 0); + --num_rq_outstanding_; +} + +} // namespace ConcurrencyController +} // namespace AdaptiveConcurrency +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller.h b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller.h new file mode 100644 index 000000000000..a7e27f311467 --- /dev/null +++ b/source/extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller.h @@ -0,0 +1,205 @@ +#pragma once + +#include +#include + +#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.h" +#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.validate.h" +#include "envoy/event/dispatcher.h" +#include "envoy/runtime/runtime.h" +#include "envoy/stats/stats_macros.h" + +#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h" + +#include "absl/base/thread_annotations.h" +#include "absl/synchronization/mutex.h" +#include "circllhist.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdaptiveConcurrency { +namespace ConcurrencyController { + +/** + * All stats for the gradient controller. + */ +#define ALL_GRADIENT_CONTROLLER_STATS(GAUGE) \ + GAUGE(concurrency_limit, NeverImport) \ + GAUGE(gradient, NeverImport) \ + GAUGE(burst_queue_size, NeverImport) \ + GAUGE(min_rtt_msecs, NeverImport) + +/** + * Wrapper struct for gradient controller stats. @see stats_macros.h + */ +struct GradientControllerStats { + ALL_GRADIENT_CONTROLLER_STATS(GENERATE_GAUGE_STRUCT) +}; + +class GradientControllerConfig { +public: + GradientControllerConfig( + const envoy::config::filter::http::adaptive_concurrency::v2alpha::GradientControllerConfig& + proto_config); + + std::chrono::milliseconds minRTTCalcInterval() const { return min_rtt_calc_interval_; } + std::chrono::milliseconds sampleRTTCalcInterval() const { return sample_rtt_calc_interval_; } + uint32_t maxConcurrencyLimit() const { return max_concurrency_limit_; } + uint32_t minRTTAggregateRequestCount() const { return min_rtt_aggregate_request_count_; } + double maxGradient() const { return max_gradient_; } + double sampleAggregatePercentile() const { return sample_aggregate_percentile_; } + +private: + // The measured request round-trip time under ideal conditions. + const std::chrono::milliseconds min_rtt_calc_interval_; + + // The measured sample round-trip time from the previous time window. + const std::chrono::milliseconds sample_rtt_calc_interval_; + + // The maximum allowed concurrency value. + const uint32_t max_concurrency_limit_; + + // The number of requests to aggregate/sample during the minRTT recalculation. + const uint32_t min_rtt_aggregate_request_count_; + + // The maximum value the gradient may take. + const double max_gradient_; + + // The percentile value considered when processing samples. + const double sample_aggregate_percentile_; +}; +using GradientControllerConfigSharedPtr = std::shared_ptr; + +/** + * A concurrency controller that implements a variation of the Gradient algorithm described in: + * + * https://medium.com/@NetflixTechBlog/performance-under-load-3e6fa9a60581 + * + * This is used to control the allowed request concurrency limit in the adaptive concurrency control + * filter. + * + * The algorithm: + * ============== + * An ideal round-trip time (minRTT) is measured periodically by only allowing a single outstanding + * request at a time and measuring the round-trip time to the upstream. This information is then + * used in the calculation of a number called the gradient, using time-sampled latencies + * (sampleRTT): + * + * gradient = minRTT / sampleRTT + * + * This gradient value has a useful property, such that it decreases as the sampled latencies + * increase. The value is then used to periodically update the concurrency limit via: + * + * limit = old_limit * gradient + * new_limit = limit + headroom + * + * The headroom value allows for request bursts and is also the driving factor behind increasing the + * concurrency limit when the sampleRTT is in the same ballpark as the minRTT. This value must be + * present in the calculation, since it forces the concurrency limit to increase until there is a + * deviation from the minRTT latency. In its absence, the concurrency limit could remain stagnant at + * an unnecessarily small value if sampleRTT ~= minRTT. Therefore, the headroom value is + * unconfigurable and is set to the square-root of the new limit. + * + * Sampling: + * ========= + * The controller makes use of latency samples to either determine the minRTT or the sampleRTT which + * is used to periodically update the concurrency limit. Each calculation occurs at separate + * configurable frequencies and they may not occur at the same time. To prevent this, there exists a + * concept of mutually exclusive sampling windows. + * + * When the gradient controller is instantiated, it starts inside of a minRTT calculation window + * (indicated by inMinRTTSamplingWindow() returning true) and the concurrency limit is pinned to 1. + * This window lasts until the configured number of requests is received, the minRTT value is + * updated, and the minRTT value is set by a single worker thread. To prevent sampleRTT calculations + * from triggering during this window, the update window mutex is held. Since it's necessary for a + * worker thread to know which update window update window mutex is held for, they check the state + * of inMinRTTSamplingWindow() after each sample. When the minRTT calculation is complete, a timer + * is set to trigger the next minRTT sampling window by the worker thread who updates the minRTT + * value. + * + * If the controller is not in a minRTT sampling window, it's possible that the controller is in a + * sampleRTT calculation window. In this, all of the latency samples are consolidated into a + * configurable quantile value to represent the measured latencies. This quantile value sets + * sampleRTT and the concurrency limit is updated as described in the algorithm section above. + * + * When not in a sampling window, the controller is simply servicing the adaptive concurrency filter + * via the public functions. + * + * Locking: + * ======== + * There are 2 mutually exclusive calculation windows, so the sample mutation mutex is held to + * prevent the overlap of these windows. It is necessary for a worker thread to know specifically if + * the controller is inside of a minRTT recalculation window during the recording of a latency + * sample, so this extra bit of information is stored in inMinRTTSamplingWindow(). + */ +class GradientController : public ConcurrencyController { +public: + GradientController(GradientControllerConfigSharedPtr config, Event::Dispatcher& dispatcher, + Runtime::Loader& runtime, const std::string& stats_prefix, + Stats::Scope& scope); + + // ConcurrencyController. + RequestForwardingAction forwardingDecision() override; + void recordLatencySample(std::chrono::nanoseconds rq_latency) override; + void cancelLatencySample() override; + uint32_t concurrencyLimit() const override { return concurrency_limit_.load(); } + +private: + static GradientControllerStats generateStats(Stats::Scope& scope, + const std::string& stats_prefix); + void updateMinRTT(); + std::chrono::microseconds processLatencySamplesAndClear() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_); + uint32_t calculateNewLimit() ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_); + void enterMinRTTSamplingWindow(); + bool inMinRTTSamplingWindow() const { return deferred_limit_value_.load() > 0; } + void resetSampleWindow() ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_); + void updateConcurrencyLimit(const uint32_t new_limit) { + concurrency_limit_.store(new_limit); + stats_.concurrency_limit_.set(concurrency_limit_.load()); + } + + const GradientControllerConfigSharedPtr config_; + Event::Dispatcher& dispatcher_; + Stats::Scope& scope_; + GradientControllerStats stats_; + + // Protects data related to latency sampling and RTT values. In addition to protecting the latency + // sample histogram, the mutex ensures that the minRTT calculation window and the sample window + // (where the new concurrency limit is determined) do not overlap. + absl::Mutex sample_mutation_mtx_; + + // Stores the value of the concurrency limit prior to entering the minRTT update window. If this + // is non-zero, then we are actively in the minRTT sampling window. + std::atomic deferred_limit_value_; + + // Stores the expected upstream latency value under ideal conditions. This is the numerator in the + // gradient value explained above. + std::chrono::nanoseconds min_rtt_; + std::chrono::nanoseconds sample_rtt_ ABSL_GUARDED_BY(sample_mutation_mtx_); + + // Tracks the count of requests that have been forwarded whose replies have + // not been sampled yet. Atomicity is required because this variable is used to make the + // forwarding decision without locking. + std::atomic num_rq_outstanding_; + + // Stores the current concurrency limit. Atomicity is required because this variable is used to + // make the forwarding decision without locking. + std::atomic concurrency_limit_; + + // Stores all sampled latencies and provides percentile estimations when using the sampled data to + // calculate a new concurrency limit. + std::unique_ptr + latency_sample_hist_ ABSL_GUARDED_BY(sample_mutation_mtx_); + + Event::TimerPtr min_rtt_calc_timer_; + Event::TimerPtr sample_reset_timer_; +}; +using GradientControllerSharedPtr = std::shared_ptr; + +} // namespace ConcurrencyController +} // namespace AdaptiveConcurrency +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/common/common/cleanup_test.cc b/test/common/common/cleanup_test.cc index 7b666c163caf..98a590308727 100644 --- a/test/common/common/cleanup_test.cc +++ b/test/common/common/cleanup_test.cc @@ -13,6 +13,18 @@ TEST(CleanupTest, ScopeExitCallback) { EXPECT_TRUE(callback_fired); } +TEST(CleanupTest, Cancel) { + bool callback_fired = false; + { + Cleanup cleanup([&callback_fired] { callback_fired = true; }); + EXPECT_FALSE(cleanup.cancelled()); + cleanup.cancel(); + EXPECT_FALSE(callback_fired); + EXPECT_TRUE(cleanup.cancelled()); + } + EXPECT_FALSE(callback_fired); +} + TEST(RaiiListElementTest, DeleteOnDestruction) { std::list l; diff --git a/test/common/protobuf/utility_test.cc b/test/common/protobuf/utility_test.cc index 539dfb6a28f6..945bec99d5f1 100644 --- a/test/common/protobuf/utility_test.cc +++ b/test/common/protobuf/utility_test.cc @@ -37,6 +37,8 @@ TEST_F(ProtobufUtilityTest, convertPercentNaN) { EXPECT_THROW(PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(common_config_, healthy_panic_threshold, 100, 50), EnvoyException); + EXPECT_THROW(PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(common_config_, healthy_panic_threshold, 0.5), + EnvoyException); } namespace ProtobufPercentHelper { diff --git a/test/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter_test.cc b/test/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter_test.cc index 60ad871dc3e5..f859c031e660 100644 --- a/test/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter_test.cc +++ b/test/extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter_test.cc @@ -24,28 +24,35 @@ using ConcurrencyController::RequestForwardingAction; class MockConcurrencyController : public ConcurrencyController::ConcurrencyController { public: MOCK_METHOD0(forwardingDecision, RequestForwardingAction()); - MOCK_METHOD1(recordLatencySample, void(const std::chrono::nanoseconds&)); + MOCK_METHOD0(cancelLatencySample, void()); + MOCK_METHOD1(recordLatencySample, void(std::chrono::nanoseconds)); + + uint32_t concurrencyLimit() const override { return 0; } }; class AdaptiveConcurrencyFilterTest : public testing::Test { public: - AdaptiveConcurrencyFilterTest() { - filter_.reset(); + AdaptiveConcurrencyFilterTest() = default; + void SetUp() override { const envoy::config::filter::http::adaptive_concurrency::v2alpha::AdaptiveConcurrency config; auto config_ptr = std::make_shared( config, runtime_, "testprefix.", stats_, time_system_); filter_ = std::make_unique(config_ptr, controller_); filter_->setDecoderFilterCallbacks(decoder_callbacks_); + filter_->setEncoderFilterCallbacks(encoder_callbacks_); } - std::unique_ptr filter_; + void TearDown() override { filter_.reset(); } + Event::SimulatedTimeSystem time_system_; Stats::IsolatedStoreImpl stats_; NiceMock runtime_; std::shared_ptr controller_{new MockConcurrencyController()}; NiceMock decoder_callbacks_; + NiceMock encoder_callbacks_; + std::unique_ptr filter_; }; TEST_F(AdaptiveConcurrencyFilterTest, DecodeHeadersTestForwarding) { @@ -53,6 +60,8 @@ TEST_F(AdaptiveConcurrencyFilterTest, DecodeHeadersTestForwarding) { EXPECT_CALL(*controller_, forwardingDecision()) .WillOnce(Return(RequestForwardingAction::Forward)); + EXPECT_CALL(*controller_, recordLatencySample(_)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers, false)); Buffer::OwnedImpl request_body; @@ -71,9 +80,60 @@ TEST_F(AdaptiveConcurrencyFilterTest, DecodeHeadersTestBlock) { filter_->decodeHeaders(request_headers, true)); } +TEST_F(AdaptiveConcurrencyFilterTest, RecordSampleInDestructor) { + // Verify that the request latency is always sampled even if encodeComplete() is never called. + EXPECT_CALL(*controller_, forwardingDecision()) + .WillOnce(Return(RequestForwardingAction::Forward)); + Http::TestHeaderMapImpl request_headers; + filter_->decodeHeaders(request_headers, true); + + EXPECT_CALL(*controller_, recordLatencySample(_)); + filter_.reset(); +} + +TEST_F(AdaptiveConcurrencyFilterTest, RecordSampleOmission) { + // Verify that the request latency is not sampled if forwardingDecision blocks the request. + EXPECT_CALL(*controller_, forwardingDecision()).WillOnce(Return(RequestForwardingAction::Block)); + Http::TestHeaderMapImpl request_headers; + filter_->decodeHeaders(request_headers, true); + + filter_.reset(); +} + +TEST_F(AdaptiveConcurrencyFilterTest, OnDestroyCleanupResetTest) { + // Get the filter to record the request start time via decode. + Http::TestHeaderMapImpl request_headers; + EXPECT_CALL(*controller_, forwardingDecision()) + .WillOnce(Return(RequestForwardingAction::Forward)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers, true)); + + EXPECT_CALL(*controller_, cancelLatencySample()); + + // Encode step is not performed prior to destruction. + filter_->onDestroy(); +} + +TEST_F(AdaptiveConcurrencyFilterTest, OnDestroyCleanupTest) { + // Get the filter to record the request start time via decode. + Http::TestHeaderMapImpl request_headers; + EXPECT_CALL(*controller_, forwardingDecision()) + .WillOnce(Return(RequestForwardingAction::Forward)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers, true)); + + const auto advance_time = std::chrono::nanoseconds(42); + time_system_.sleep(advance_time); + + Http::TestHeaderMapImpl response_headers; + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->encodeHeaders(response_headers, false)); + EXPECT_CALL(*controller_, recordLatencySample(advance_time)); + filter_->encodeComplete(); + + filter_->onDestroy(); +} + TEST_F(AdaptiveConcurrencyFilterTest, EncodeHeadersValidTest) { auto mt = time_system_.monotonicTime(); - time_system_.setMonotonicTime(mt + std::chrono::milliseconds(123)); + time_system_.setMonotonicTime(mt + std::chrono::nanoseconds(123)); // Get the filter to record the request start time via decode. Http::TestHeaderMapImpl request_headers; @@ -81,7 +141,7 @@ TEST_F(AdaptiveConcurrencyFilterTest, EncodeHeadersValidTest) { .WillOnce(Return(RequestForwardingAction::Forward)); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers, true)); - const std::chrono::nanoseconds advance_time = std::chrono::milliseconds(42); + const auto advance_time = std::chrono::nanoseconds(42); mt = time_system_.monotonicTime(); time_system_.setMonotonicTime(mt + advance_time); diff --git a/test/extensions/filters/http/adaptive_concurrency/concurrency_controller/BUILD b/test/extensions/filters/http/adaptive_concurrency/concurrency_controller/BUILD new file mode 100644 index 000000000000..eda772937cd7 --- /dev/null +++ b/test/extensions/filters/http/adaptive_concurrency/concurrency_controller/BUILD @@ -0,0 +1,28 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test_library", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +envoy_package() + +envoy_extension_cc_test( + name = "gradient_controller_test", + srcs = ["gradient_controller_test.cc"], + extension_name = "envoy.filters.http.adaptive_concurrency", + deps = [ + "//source/common/stats:isolated_store_lib", + "//source/extensions/filters/http/adaptive_concurrency:adaptive_concurrency_filter_lib", + "//source/extensions/filters/http/adaptive_concurrency/concurrency_controller:concurrency_controller_lib", + "//test/mocks/event:event_mocks", + "//test/mocks/runtime:runtime_mocks", + "//test/test_common:simulated_time_system_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/test/extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller_test.cc b/test/extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller_test.cc new file mode 100644 index 000000000000..1a523df9730a --- /dev/null +++ b/test/extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller_test.cc @@ -0,0 +1,497 @@ +#include +#include + +#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.h" +#include "envoy/config/filter/http/adaptive_concurrency/v2alpha/adaptive_concurrency.pb.validate.h" + +#include "common/stats/isolated_store_impl.h" + +#include "extensions/filters/http/adaptive_concurrency/adaptive_concurrency_filter.h" +#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/concurrency_controller.h" +#include "extensions/filters/http/adaptive_concurrency/concurrency_controller/gradient_controller.h" + +#include "test/mocks/event/mocks.h" +#include "test/mocks/runtime/mocks.h" +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::AllOf; +using testing::Ge; +using testing::Le; +using testing::NiceMock; +using testing::Return; + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdaptiveConcurrency { +namespace ConcurrencyController { +namespace { + +class GradientControllerConfigTest : public testing::Test { +public: + GradientControllerConfigTest() = default; +}; + +class GradientControllerTest : public testing::Test { +public: + GradientControllerTest() + : api_(Api::createApiForTest(time_system_)), dispatcher_(api_->allocateDispatcher()) {} + + GradientControllerSharedPtr makeController(const std::string& yaml_config) { + return std::make_shared(makeConfig(yaml_config), *dispatcher_, runtime_, + "test_prefix.", stats_); + } + +protected: + GradientControllerConfigSharedPtr makeConfig(const std::string& yaml_config) { + envoy::config::filter::http::adaptive_concurrency::v2alpha::GradientControllerConfig proto = + TestUtility::parseYaml< + envoy::config::filter::http::adaptive_concurrency::v2alpha::GradientControllerConfig>( + yaml_config); + return std::make_shared(proto); + } + + // Helper function that will attempt to pull forwarding decisions. + void tryForward(const GradientControllerSharedPtr& controller, + const bool expect_forward_response) { + const auto expected_resp = + expect_forward_response ? RequestForwardingAction::Forward : RequestForwardingAction::Block; + EXPECT_EQ(expected_resp, controller->forwardingDecision()); + } + + // Gets the controller past the initial minRTT stage. + void advancePastMinRTTStage(const GradientControllerSharedPtr& controller, + const std::string& yaml_config, + std::chrono::milliseconds latency = std::chrono::milliseconds(5)) { + const auto config = makeConfig(yaml_config); + for (uint32_t ii = 0; ii <= config->minRTTAggregateRequestCount(); ++ii) { + tryForward(controller, true); + controller->recordLatencySample(latency); + } + } + + Event::SimulatedTimeSystem time_system_; + Stats::IsolatedStoreImpl stats_; + NiceMock runtime_; + Api::ApiPtr api_; + Event::DispatcherPtr dispatcher_; +}; + +TEST_F(GradientControllerConfigTest, BasicTest) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 42 +concurrency_limit_params: + max_gradient: 2.1 + max_concurrency_limit: 1337 + concurrency_update_interval: + nanos: 123000000 +min_rtt_calc_params: + interval: + seconds: 31 + request_count: 52 +)EOF"; + + envoy::config::filter::http::adaptive_concurrency::v2alpha::GradientControllerConfig proto = + TestUtility::parseYaml< + envoy::config::filter::http::adaptive_concurrency::v2alpha::GradientControllerConfig>( + yaml); + GradientControllerConfig config(proto); + + EXPECT_EQ(config.minRTTCalcInterval(), std::chrono::seconds(31)); + EXPECT_EQ(config.sampleRTTCalcInterval(), std::chrono::milliseconds(123)); + EXPECT_EQ(config.maxConcurrencyLimit(), 1337); + EXPECT_EQ(config.minRTTAggregateRequestCount(), 52); + EXPECT_EQ(config.maxGradient(), 2.1); + EXPECT_EQ(config.sampleAggregatePercentile(), 0.42); +} + +TEST_F(GradientControllerConfigTest, DefaultValuesTest) { + const std::string yaml = R"EOF( +concurrency_limit_params: + concurrency_update_interval: + nanos: 123000000 +min_rtt_calc_params: + interval: + seconds: 31 +)EOF"; + + envoy::config::filter::http::adaptive_concurrency::v2alpha::GradientControllerConfig proto = + TestUtility::parseYaml< + envoy::config::filter::http::adaptive_concurrency::v2alpha::GradientControllerConfig>( + yaml); + GradientControllerConfig config(proto); + + EXPECT_EQ(config.minRTTCalcInterval(), std::chrono::seconds(31)); + EXPECT_EQ(config.sampleRTTCalcInterval(), std::chrono::milliseconds(123)); + EXPECT_EQ(config.maxConcurrencyLimit(), 1000); + EXPECT_EQ(config.minRTTAggregateRequestCount(), 50); + EXPECT_EQ(config.maxGradient(), 2.0); + EXPECT_EQ(config.sampleAggregatePercentile(), 0.5); +} + +TEST_F(GradientControllerTest, MinRTTLogicTest) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_gradient: 2.0 + max_concurrency_limit: + concurrency_update_interval: + nanos: 100000000 # 100ms +min_rtt_calc_params: + interval: + seconds: 30 + request_count: 50 +)EOF"; + + auto controller = makeController(yaml); + const auto min_rtt = std::chrono::milliseconds(13); + + // The controller should be measuring minRTT upon creation, so the concurrency window is 1. + EXPECT_EQ(controller->concurrencyLimit(), 1); + tryForward(controller, true); + tryForward(controller, false); + tryForward(controller, false); + controller->recordLatencySample(min_rtt); + + // 49 more requests should cause the minRTT to be done calculating. + for (int ii = 0; ii < 49; ++ii) { + EXPECT_EQ(controller->concurrencyLimit(), 1); + tryForward(controller, true); + tryForward(controller, false); + controller->recordLatencySample(min_rtt); + } + + // Verify the minRTT value measured is accurate. + EXPECT_EQ( + 13, stats_.gauge("test_prefix.min_rtt_msecs", Stats::Gauge::ImportMode::NeverImport).value()); +} + +TEST_F(GradientControllerTest, CancelLatencySample) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_gradient: 2.0 + max_concurrency_limit: + concurrency_update_interval: + nanos: 100000000 # 100ms +min_rtt_calc_params: + interval: + seconds: 30 + request_count: 5 +)EOF"; + + auto controller = makeController(yaml); + + for (int ii = 1; ii <= 5; ++ii) { + tryForward(controller, true); + controller->recordLatencySample(std::chrono::milliseconds(ii)); + } + EXPECT_EQ( + 3, stats_.gauge("test_prefix.min_rtt_msecs", Stats::Gauge::ImportMode::NeverImport).value()); +} + +TEST_F(GradientControllerTest, SamplePercentileProcessTest) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_gradient: 2.0 + max_concurrency_limit: + concurrency_update_interval: + nanos: 100000000 # 100ms +min_rtt_calc_params: + interval: + seconds: 30 + request_count: 5 +)EOF"; + + auto controller = makeController(yaml); + + tryForward(controller, true); + tryForward(controller, false); + controller->cancelLatencySample(); + tryForward(controller, true); + tryForward(controller, false); +} + +TEST_F(GradientControllerTest, ConcurrencyLimitBehaviorTestBasic) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_gradient: 2.0 + max_concurrency_limit: + concurrency_update_interval: + nanos: 100000000 # 100ms +min_rtt_calc_params: + interval: + seconds: 30 + request_count: 5 +)EOF"; + + auto controller = makeController(yaml); + EXPECT_EQ(controller->concurrencyLimit(), 1); + + // Force a minRTT of 5ms. + advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); + EXPECT_EQ( + 5, stats_.gauge("test_prefix.min_rtt_msecs", Stats::Gauge::ImportMode::NeverImport).value()); + + // Ensure that the concurrency window increases on its own due to the headroom calculation. + time_system_.sleep(std::chrono::milliseconds(101)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_GT(controller->concurrencyLimit(), 1); + + // Make it seem as if the recorded latencies are consistently lower than the measured minRTT. + // Ensure that it grows. + for (int recalcs = 0; recalcs < 10; ++recalcs) { + const auto last_concurrency = controller->concurrencyLimit(); + for (int ii = 1; ii <= 5; ++ii) { + tryForward(controller, true); + controller->recordLatencySample(std::chrono::milliseconds(4)); + } + time_system_.sleep(std::chrono::milliseconds(101)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_GT(controller->concurrencyLimit(), last_concurrency); + } + + // Verify that the concurrency limit can now shrink as necessary. + for (int recalcs = 0; recalcs < 10; ++recalcs) { + const auto last_concurrency = controller->concurrencyLimit(); + for (int ii = 1; ii <= 5; ++ii) { + tryForward(controller, true); + controller->recordLatencySample(std::chrono::milliseconds(6)); + } + time_system_.sleep(std::chrono::milliseconds(101)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_LT(controller->concurrencyLimit(), last_concurrency); + } +} + +TEST_F(GradientControllerTest, MaxGradientTest) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_gradient: 3.0 + max_concurrency_limit: + concurrency_update_interval: + nanos: 100000000 # 100ms +min_rtt_calc_params: + interval: + seconds: 30 + request_count: 5 +)EOF"; + + auto controller = makeController(yaml); + EXPECT_EQ(controller->concurrencyLimit(), 1); + + // Force a minRTT of 5 seconds. + advancePastMinRTTStage(controller, yaml, std::chrono::seconds(5)); + + // circllhist approximates the percentiles, so we can expect it to be within a certain range. + EXPECT_THAT( + stats_.gauge("test_prefix.min_rtt_msecs", Stats::Gauge::ImportMode::NeverImport).value(), + AllOf(Ge(4950), Le(5050))); + + // Now verify max gradient value by forcing dramatically faster latency measurements.. + for (int ii = 1; ii <= 5; ++ii) { + tryForward(controller, true); + controller->recordLatencySample(std::chrono::milliseconds(4)); + } + time_system_.sleep(std::chrono::milliseconds(101)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_EQ(3.0, + stats_.gauge("test_prefix.gradient", Stats::Gauge::ImportMode::NeverImport).value()); +} + +TEST_F(GradientControllerTest, MinRTTReturnToPreviousLimit) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_gradient: 3.0 + max_concurrency_limit: + concurrency_update_interval: + nanos: 100000000 # 100ms +min_rtt_calc_params: + interval: + seconds: 30 + request_count: 5 +)EOF"; + + auto controller = makeController(yaml); + EXPECT_EQ(controller->concurrencyLimit(), 1); + + // Get initial minRTT measurement out of the way. + advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); + + // Force the limit calculation to run a few times from some measurements. + for (int sample_iters = 0; sample_iters < 5; ++sample_iters) { + const auto last_concurrency = controller->concurrencyLimit(); + for (int ii = 1; ii <= 5; ++ii) { + tryForward(controller, true); + controller->recordLatencySample(std::chrono::milliseconds(4)); + } + time_system_.sleep(std::chrono::milliseconds(101)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + // Verify the value is growing. + EXPECT_GT(controller->concurrencyLimit(), last_concurrency); + } + + const auto limit_val = controller->concurrencyLimit(); + + // Wait until the minRTT recalculation is triggered again and verify the limit drops. + time_system_.sleep(std::chrono::seconds(31)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_EQ(controller->concurrencyLimit(), 1); + + // 49 more requests should cause the minRTT to be done calculating. + for (int ii = 0; ii < 5; ++ii) { + EXPECT_EQ(controller->concurrencyLimit(), 1); + tryForward(controller, true); + controller->recordLatencySample(std::chrono::milliseconds(13)); + } + + // Check that we restored the old concurrency limit value. + EXPECT_EQ(limit_val, controller->concurrencyLimit()); +} + +TEST_F(GradientControllerTest, MinRTTRescheduleTest) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_gradient: 3.0 + max_concurrency_limit: + concurrency_update_interval: + nanos: 100000000 # 100ms +min_rtt_calc_params: + interval: + seconds: 30 + request_count: 5 +)EOF"; + + auto controller = makeController(yaml); + EXPECT_EQ(controller->concurrencyLimit(), 1); + + // Get initial minRTT measurement out of the way. + advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); + + // Force the limit calculation to run a few times from some measurements. + for (int sample_iters = 0; sample_iters < 5; ++sample_iters) { + const auto last_concurrency = controller->concurrencyLimit(); + for (int ii = 1; ii <= 5; ++ii) { + tryForward(controller, true); + controller->recordLatencySample(std::chrono::milliseconds(4)); + } + time_system_.sleep(std::chrono::milliseconds(101)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + // Verify the value is growing. + EXPECT_GT(controller->concurrencyLimit(), last_concurrency); + } + + // Wait until the minRTT recalculation is triggered again and verify the limit drops. + time_system_.sleep(std::chrono::seconds(31)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_EQ(controller->concurrencyLimit(), 1); + + // Verify sample recalculation doesn't occur during the minRTT window. + time_system_.sleep(std::chrono::milliseconds(101)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_EQ(controller->concurrencyLimit(), 1); +} + +TEST_F(GradientControllerTest, NoSamplesTest) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_gradient: 3.0 + max_concurrency_limit: + concurrency_update_interval: + nanos: 100000000 # 100ms +min_rtt_calc_params: + interval: + seconds: 30 + request_count: 5 +)EOF"; + + auto controller = makeController(yaml); + EXPECT_EQ(controller->concurrencyLimit(), 1); + + // Get minRTT measurement out of the way. + advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); + + // Force the limit calculation to run a few times from some measurements. + for (int sample_iters = 0; sample_iters < 5; ++sample_iters) { + const auto last_concurrency = controller->concurrencyLimit(); + for (int ii = 1; ii <= 5; ++ii) { + tryForward(controller, true); + controller->recordLatencySample(std::chrono::milliseconds(4)); + } + time_system_.sleep(std::chrono::milliseconds(101)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + // Verify the value is growing. + EXPECT_GT(controller->concurrencyLimit(), last_concurrency); + } + + // Now we make sure that the limit value doesn't change in the absence of samples. + for (int sample_iters = 0; sample_iters < 5; ++sample_iters) { + const auto old_limit = controller->concurrencyLimit(); + time_system_.sleep(std::chrono::milliseconds(101)); + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_EQ(old_limit, controller->concurrencyLimit()); + } +} + +TEST_F(GradientControllerTest, TimerAccuracyTest) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_gradient: 3.0 + max_concurrency_limit: + concurrency_update_interval: + nanos: 123000000 # 123ms +min_rtt_calc_params: + interval: + seconds: 45 + request_count: 5 +)EOF"; + + // Verify the configuration affects the timers that are kicked off. + NiceMock fake_dispatcher; + auto sample_timer = new NiceMock; + auto rtt_timer = new NiceMock; + + // Expect the sample timer to trigger start immediately upon controller creation. + EXPECT_CALL(fake_dispatcher, createTimer_(_)) + .Times(2) + .WillOnce(Return(rtt_timer)) + .WillOnce(Return(sample_timer)); + EXPECT_CALL(*sample_timer, enableTimer(std::chrono::milliseconds(123), _)); + auto controller = std::make_shared(makeConfig(yaml), fake_dispatcher, + runtime_, "test_prefix.", stats_); + + // Set the minRTT- this will trigger the timer for the next minRTT calculation. + EXPECT_CALL(*rtt_timer, enableTimer(std::chrono::milliseconds(45000), _)); + for (int ii = 1; ii <= 6; ++ii) { + tryForward(controller, true); + controller->recordLatencySample(std::chrono::milliseconds(5)); + } +} + +} // namespace +} // namespace ConcurrencyController +} // namespace AdaptiveConcurrency +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/tools/spelling_dictionary.txt b/tools/spelling_dictionary.txt index 7bb603b68a4c..530f79bb325a 100644 --- a/tools/spelling_dictionary.txt +++ b/tools/spelling_dictionary.txt @@ -15,6 +15,7 @@ ASCII ASSERTs AWS BSON +CAS CB CBs CDS