From 2bf238b8c60f37303878101020a28d3dbd222453 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Wed, 31 Aug 2016 14:09:37 -0700 Subject: [PATCH] resource manager priorities (#46) --- .../envoy_service_to_service.template.json | 17 +++-- configs/routing_helper.template.json | 6 +- .../configuration/cluster_manager/cluster.rst | 28 ++------ .../cluster_circuit_breakers.rst | 55 +++++++++++++++ docs/intro/arch_overview/circuit_breaking.rst | 18 +++-- include/envoy/upstream/cluster_manager.h | 3 +- include/envoy/upstream/resource_manager.h | 7 ++ include/envoy/upstream/upstream.h | 5 +- source/common/http/async_client_impl.cc | 4 +- source/common/http/async_client_impl.h | 2 +- source/common/http/http1/conn_pool.cc | 12 ++-- source/common/http/http1/conn_pool.h | 11 +-- source/common/http/http2/conn_pool.cc | 10 +-- source/common/http/http2/conn_pool.h | 4 +- source/common/json/json_loader.cc | 12 +++- source/common/json/json_loader.h | 13 +++- source/common/router/retry_state_impl.cc | 12 ++-- source/common/router/retry_state_impl.h | 4 +- source/common/router/router.cc | 15 +++-- .../common/upstream/cluster_manager_impl.cc | 67 +++++++++++++------ source/common/upstream/cluster_manager_impl.h | 26 ++++--- .../common/upstream/resource_manager_impl.h | 2 + source/common/upstream/upstream_impl.cc | 36 ++++++++-- source/common/upstream/upstream_impl.h | 14 +++- test/common/http/async_client_impl_test.cc | 4 +- test/common/http/http1/conn_pool_test.cc | 2 +- test/common/http/http2/conn_pool_test.cc | 3 +- test/common/json/json_loader_test.cc | 11 +++ test/common/router/retry_state_impl_test.cc | 4 +- test/common/router/router_test.cc | 4 +- .../upstream/cluster_manager_impl_test.cc | 30 +++++++-- test/common/upstream/upstream_impl_test.cc | 43 ++++++++---- test/mocks/upstream/mocks.cc | 7 +- test/mocks/upstream/mocks.h | 5 +- 34 files changed, 356 insertions(+), 140 deletions(-) create mode 100644 docs/configuration/cluster_manager/cluster_circuit_breakers.rst diff --git a/configs/envoy_service_to_service.template.json b/configs/envoy_service_to_service.template.json index 374c5e7952de..f9bacca90c11 100644 --- a/configs/envoy_service_to_service.template.json +++ b/configs/envoy_service_to_service.template.json @@ -334,9 +334,14 @@ "connect_timeout_ms": 250, "type": "static", "lb_type": "round_robin", - "max_pending_requests": 30, {# Apply back pressure quickly at the local host level. NOTE: This - only is applicable with the HTTP/1.1 connection pool. #} - "max_connections": 100, + "circuit_breakers": { + "default": { + "max_pending_requests": 30, {# Apply back pressure quickly at the local host level. NOTE: + This only is applicable with the HTTP/1.1 connection + pool. #} + "max_connections": 100 + } + }, "hosts": [{"url": "tcp://127.0.0.1:8080"}] }, @@ -346,7 +351,11 @@ "type": "static", "lb_type": "round_robin", "features": "http2", - "max_requests": 200, + "circuit_breakers": { + "default": { + "max_requests": 200 + } + }, "hosts": [{"url": "tcp://127.0.0.1:8081"}] }, { diff --git a/configs/routing_helper.template.json b/configs/routing_helper.template.json index 672f2a92b0d9..5952073aad3f 100644 --- a/configs/routing_helper.template.json +++ b/configs/routing_helper.template.json @@ -26,7 +26,11 @@ "http_codec_options": "no_compression", "service_name": "{{ service }}", {% if 'max_requests' in options %} - "max_requests": {{ options['max_requests'] }}, + "circuit_breakers": { + "default": { + "max_requests": {{ options['max_requests'] }} + } + }, {% endif %} "health_check": { "type": "http", diff --git a/docs/configuration/cluster_manager/cluster.rst b/docs/configuration/cluster_manager/cluster.rst index fd3b00123590..18af25662aac 100644 --- a/docs/configuration/cluster_manager/cluster.rst +++ b/docs/configuration/cluster_manager/cluster.rst @@ -14,10 +14,7 @@ Cluster "service_name": "...", "health_check": "{...}", "max_requests_per_connection": "...", - "max_connections": "...", - "max_pending_requests": "...", - "max_requests": "...", - "max_retries": "...", + "circuit_breakers": "{...}", "ssl_context": "{...}", "features": "...", "http_codec_options": "...", @@ -96,25 +93,9 @@ max_requests_per_connection parameter is respected by both the HTTP/1.1 and HTTP/2 connection pool implementations. If not specified, there is no limit. Setting this parameter to 1 will effectively disable keep alive. -max_connections - *(optional, integer)* The maximum number of connections that Envoy will make to the upstream - cluster. If not specified, the default is 1024. See the :ref:`circuit breaking overview - ` for more information. - -max_pending_requests - *(optional, integer)* The maximum number of pending requests that Envoy will allow to the upstream - cluster. If not specified, the default is 1024. See the :ref:`circuit breaking overview - ` for more information. - -max_requests - *(optional, integer)* The maximum number of parallel requests that Envoy will make to the upstream - cluster. If not specified, the default is 1024. See the :ref:`circuit breaking overview - ` for more information. - -max_retries - *(optional, integer)* The maximum number of parallel retries that Envoy will allow to the upstream - cluster. If not specified, the default is 3. See the :ref:`circuit breaking overview - ` for more information. +:ref:`circuit_breakers ` + *(optional, object)* Optional :ref:`circuit breaking ` settings + for the cluster. :ref:`ssl_context ` *(optional, object)* The TLS configuration for connections to the upstream cluster. If no TLS @@ -150,6 +131,7 @@ alt_stat_name :hidden: cluster_hc + cluster_circuit_breakers cluster_ssl cluster_stats cluster_runtime diff --git a/docs/configuration/cluster_manager/cluster_circuit_breakers.rst b/docs/configuration/cluster_manager/cluster_circuit_breakers.rst new file mode 100644 index 000000000000..96cf3905773b --- /dev/null +++ b/docs/configuration/cluster_manager/cluster_circuit_breakers.rst @@ -0,0 +1,55 @@ +.. _config_cluster_manager_cluster_circuit_breakers: + +Circuit breakers +================ + +Circuit breaking :ref:`architecture overview `. + +Circuit breaking settings can be specified individually for each defined priority. How the +different priorities are used are documented in the sections of the configuration guide that use +them. + +.. code-block:: json + + { + "default": "{...}", + "high": "{...}" + } + +default + *(optional, object)* Settings object for default priority. + +high + *(optional, object)* Settings object for high priority. + +Per priority settings +--------------------- + +.. code-block:: json + + { + "max_connections": "...", + "max_pending_requests": "...", + "max_requests": "...", + "max_retries": "...", + } + +max_connections + *(optional, integer)* The maximum number of connections that Envoy will make to the upstream + cluster. If not specified, the default is 1024. See the :ref:`circuit breaking overview + ` for more information. + +max_pending_requests + *(optional, integer)* The maximum number of pending requests that Envoy will allow to the upstream + cluster. If not specified, the default is 1024. See the :ref:`circuit breaking overview + ` for more information. + +max_requests + *(optional, integer)* The maximum number of parallel requests that Envoy will make to the upstream + cluster. If not specified, the default is 1024. See the :ref:`circuit breaking overview + ` for more information. + +max_retries + *(optional, integer)* The maximum number of parallel retries that Envoy will allow to the upstream + cluster. If not specified, the default is 3. See the :ref:`circuit breaking overview + ` for more information. diff --git a/docs/intro/arch_overview/circuit_breaking.rst b/docs/intro/arch_overview/circuit_breaking.rst index 3363ac251b51..e6928244aeb2 100644 --- a/docs/intro/arch_overview/circuit_breaking.rst +++ b/docs/intro/arch_overview/circuit_breaking.rst @@ -15,14 +15,20 @@ configure and code each application independently. Envoy supports various types * **Cluster maximum pending requests**: The maximum number of requests that will be queued while waiting for a ready connection pool connection. In practice this is only applicable to HTTP/1.1 clusters since HTTP/2 connection pools never queue requests. HTTP/2 requests are multiplexed - immediately. + immediately. If this circuit breaker overflows the :ref:`upstream_rq_pending_overflow + ` counter for the cluster will increment. * **Cluster maximum requests**: The maximum number of requests that can be outstanding to all hosts - in a cluster at any given time. + in a cluster at any given time. In practice this is applicable to HTTP/2 clusters since HTTP/1.1 + clusters are governed by the maximum connections circuit breaker. If this circuit breaker + overflows the :ref:`upstream_rq_pending_overflow ` counter + for the cluster will increment. * **Cluster maximum active retries**: The maximum number of retries that can be outstanding to all hosts in a cluster at any given time. In general we recommend aggressively circuit breaking retries so that retries for sporadic failures are allowed but the overall retry volume cannot - explode and cause large scale cascading failure. + explode and cause large scale cascading failure. If this circuit breaker overflows the + :ref:`upstream_rq_retry_overflow ` counter for the cluster + will increment. -Each circuit breaking limit is :ref:`configurable ` and tracked on a -per upstream cluster basis. This allows different components of the distributed system to be tuned -independently and have different limits. +Each circuit breaking limit is :ref:`configurable ` +and tracked on a per upstream cluster and per priority basis. This allows different components of +the distributed system to be tuned independently and have different limits. diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 002c025fa0cb..439478ff338e 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -38,7 +38,8 @@ class ClusterManager { * * Can return nullptr if there is no host available in the cluster. */ - virtual Http::ConnectionPool::Instance* httpConnPoolForCluster(const std::string& cluster) PURE; + virtual Http::ConnectionPool::Instance* httpConnPoolForCluster(const std::string& cluster, + ResourcePriority priority) PURE; /** * Allocate a load balanced TCP connection for a cluster. The created connection is already diff --git a/include/envoy/upstream/resource_manager.h b/include/envoy/upstream/resource_manager.h index a3eba3c5777e..74a2b115aa1f 100644 --- a/include/envoy/upstream/resource_manager.h +++ b/include/envoy/upstream/resource_manager.h @@ -4,6 +4,13 @@ namespace Upstream { +/** + * Resource priority classes. The parallel NumResourcePriorities constant allows defining fixed + * arrays for each priority, but does not pollute the enum. + */ +enum class ResourcePriority { Default, High }; +const size_t NumResourcePriorities = 2; + /** * An individual resource tracked by the resource manager. */ diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index 75d5cc3d2b37..f41c9ef84938 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -233,9 +233,10 @@ class Cluster : public virtual HostSet { virtual const std::string& name() const PURE; /** - * @return ResourceManager& the resource manager to use by proxy agents for this cluster. + * @return ResourceManager& the resource manager to use by proxy agents for for this cluster (at + * a particular priority). */ - virtual ResourceManager& resourceManager() const PURE; + virtual ResourceManager& resourceManager(ResourcePriority priority) const PURE; /** * Shutdown the cluster prior to destroying connection pools and other thread local data. diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index e39a105e1ba9..afaf55bbe21a 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -23,7 +23,9 @@ AsyncClientImpl::~AsyncClientImpl() { ASSERT(active_requests_.empty()); } AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks, const Optional& timeout) { - ConnectionPool::Instance* conn_pool = factory_.connPool(); + // For now we use default priority for all requests. We could eventually expose priority out of + // send if needed. + ConnectionPool::Instance* conn_pool = factory_.connPool(Upstream::ResourcePriority::Default); if (!conn_pool) { callbacks.onFailure(AsyncClient::FailureReason::Reset); return nullptr; diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 39e5fa5132a6..1d183dc370b6 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -25,7 +25,7 @@ class AsyncClientConnPoolFactory { /** * Return a connection pool or nullptr if there is no healthy upstream host. */ - virtual ConnectionPool::Instance* connPool() PURE; + virtual ConnectionPool::Instance* connPool(Upstream::ResourcePriority priority) PURE; }; class AsyncRequestImpl; diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 526ea93bd2a1..5f5f3f49b1a2 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -71,10 +71,10 @@ ConnectionPool::Cancellable* ConnPoolImpl::newStream(StreamDecoder& response_dec return nullptr; } - if (host_->cluster().resourceManager().pendingRequests().canCreate()) { + if (host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) { // If we have no connections at all, make one no matter what so we don't starve. if ((ready_clients_.size() == 0 && busy_clients_.size() == 0) || - host_->cluster().resourceManager().connections().canCreate()) { + host_->cluster().resourceManager(priority_).connections().canCreate()) { createNewConnection(); } @@ -243,12 +243,12 @@ ConnPoolImpl::PendingRequest::PendingRequest(ConnPoolImpl& parent, StreamDecoder : parent_(parent), decoder_(decoder), callbacks_(callbacks) { parent_.host_->cluster().stats().upstream_rq_pending_total_.inc(); parent_.host_->cluster().stats().upstream_rq_pending_active_.inc(); - parent_.host_->cluster().resourceManager().pendingRequests().inc(); + parent_.host_->cluster().resourceManager(parent_.priority_).pendingRequests().inc(); } ConnPoolImpl::PendingRequest::~PendingRequest() { parent_.host_->cluster().stats().upstream_rq_pending_active_.dec(); - parent_.host_->cluster().resourceManager().pendingRequests().dec(); + parent_.host_->cluster().resourceManager(parent_.priority_).pendingRequests().dec(); } ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) @@ -268,14 +268,14 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) parent_.host_->stats().cx_active_.inc(); conn_length_ = parent_.host_->cluster().stats().upstream_cx_length_ms_.allocateSpan(); connect_timer_->enableTimer(parent_.host_->cluster().connectTimeout()); - parent_.host_->cluster().resourceManager().connections().inc(); + parent_.host_->cluster().resourceManager(parent_.priority_).connections().inc(); } ConnPoolImpl::ActiveClient::~ActiveClient() { parent_.host_->cluster().stats().upstream_cx_active_.dec(); parent_.host_->stats().cx_active_.dec(); conn_length_->complete(); - parent_.host_->cluster().resourceManager().connections().dec(); + parent_.host_->cluster().resourceManager(parent_.priority_).connections().dec(); } void ConnPoolImpl::ActiveClient::onBufferChange(Network::ConnectionBufferType type, uint64_t, diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index 0a9e6ce09929..f4750337ad0f 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -22,8 +22,9 @@ namespace Http1 { */ class ConnPoolImpl : Logger::Loggable, public ConnectionPool::Instance { public: - ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::ConstHostPtr host, Stats::Store& store) - : dispatcher_(dispatcher), host_(host), store_(store) {} + ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::ConstHostPtr host, Stats::Store& store, + Upstream::ResourcePriority priority) + : dispatcher_(dispatcher), host_(host), store_(store), priority_(priority) {} ~ConnPoolImpl(); @@ -127,6 +128,7 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: std::list pending_requests_; Stats::Store& store_; std::list drained_callbacks_; + Upstream::ResourcePriority priority_; }; /** @@ -134,8 +136,9 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: */ class ConnPoolImplProd : public ConnPoolImpl { public: - ConnPoolImplProd(Event::Dispatcher& dispatcher, Upstream::ConstHostPtr host, Stats::Store& store) - : ConnPoolImpl(dispatcher, host, store) {} + ConnPoolImplProd(Event::Dispatcher& dispatcher, Upstream::ConstHostPtr host, Stats::Store& store, + Upstream::ResourcePriority priority) + : ConnPoolImpl(dispatcher, host, store, priority) {} // ConnPoolImpl CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) override; diff --git a/source/common/http/http2/conn_pool.cc b/source/common/http/http2/conn_pool.cc index efda01a1ba75..cdec99a7c1f0 100644 --- a/source/common/http/http2/conn_pool.cc +++ b/source/common/http/http2/conn_pool.cc @@ -13,8 +13,8 @@ namespace Http { namespace Http2 { ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::ConstHostPtr host, - Stats::Store& store) - : dispatcher_(dispatcher), host_(host), stats_store_(store) {} + Stats::Store& store, Upstream::ResourcePriority priority) + : dispatcher_(dispatcher), host_(host), stats_store_(store), priority_(priority) {} ConnPoolImpl::~ConnPoolImpl() { if (primary_client_) { @@ -81,7 +81,7 @@ ConnectionPool::Cancellable* ConnPoolImpl::newStream(Http::StreamDecoder& respon } if (primary_client_->client_->numActiveRequests() >= maxConcurrentStreams() || - !host_->cluster().resourceManager().requests().canCreate()) { + !host_->cluster().resourceManager(priority_).requests().canCreate()) { log_debug("max requests overflow"); callbacks.onPoolFailure(ConnectionPool::PoolFailureReason::Overflow, nullptr); host_->cluster().stats().upstream_rq_pending_overflow_.inc(); @@ -92,7 +92,7 @@ ConnectionPool::Cancellable* ConnPoolImpl::newStream(Http::StreamDecoder& respon host_->stats().rq_active_.inc(); host_->cluster().stats().upstream_rq_total_.inc(); host_->cluster().stats().upstream_rq_active_.inc(); - host_->cluster().resourceManager().requests().inc(); + host_->cluster().resourceManager(priority_).requests().inc(); callbacks.onPoolReady(primary_client_->client_->newStream(response_decoder), primary_client_->real_host_description_); } @@ -176,7 +176,7 @@ void ConnPoolImpl::onStreamDestroy(ActiveClient& client) { client.client_->numActiveRequests()); host_->stats().rq_active_.dec(); host_->cluster().stats().upstream_rq_active_.dec(); - host_->cluster().resourceManager().requests().dec(); + host_->cluster().resourceManager(priority_).requests().dec(); if (&client == draining_client_.get() && client.client_->numActiveRequests() == 0) { // Close out the draining client if we no long have active requests. client.client_->close(); diff --git a/source/common/http/http2/conn_pool.h b/source/common/http/http2/conn_pool.h index 83415e4a8240..672fa077b1b0 100644 --- a/source/common/http/http2/conn_pool.h +++ b/source/common/http/http2/conn_pool.h @@ -17,7 +17,8 @@ namespace Http2 { */ class ConnPoolImpl : Logger::Loggable, public ConnectionPool::Instance { public: - ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::ConstHostPtr host, Stats::Store& store); + ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::ConstHostPtr host, Stats::Store& store, + Upstream::ResourcePriority priority); ~ConnPoolImpl(); // Http::ConnectionPool::Instance @@ -76,6 +77,7 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: ActiveClientPtr primary_client_; ActiveClientPtr draining_client_; std::list drained_callbacks_; + Upstream::ResourcePriority priority_; }; /** diff --git a/source/common/json/json_loader.cc b/source/common/json/json_loader.cc index d0aa5710bc4b..c5c153e1f25d 100644 --- a/source/common/json/json_loader.cc +++ b/source/common/json/json_loader.cc @@ -26,6 +26,12 @@ StringLoader::StringLoader(const std::string& json) { StringLoader::~StringLoader() { json_decref(json_); } +Object::EmptyObject Object::empty_; + +Object::EmptyObject::EmptyObject() : json_(json_object()) {} + +Object::EmptyObject::~EmptyObject() { json_decref(json_); } + std::vector Object::asObjectArray() const { if (!json_is_array(json_)) { throw Exception(fmt::format("'{}' is not an array", name_)); @@ -73,8 +79,12 @@ int64_t Object::getInteger(const std::string& name, int64_t default_value) const } } -Object Object::getObject(const std::string& name) const { +Object Object::getObject(const std::string& name, bool allow_empty) const { json_t* object = json_object_get(json_, name.c_str()); + if (!object && allow_empty) { + object = empty_.json_; + } + if (!object) { throw Exception(fmt::format("key '{}' missing in '{}'", name, name_)); } diff --git a/source/common/json/json_loader.h b/source/common/json/json_loader.h index d9ffb284af5f..aa10ad08b8ba 100644 --- a/source/common/json/json_loader.h +++ b/source/common/json/json_loader.h @@ -67,9 +67,10 @@ class Object { /** * Get a sub-object by name. * @param name supplies the key name. + * @param allow_empty supplies whether to return an empty object if the key does not exist. * @return Object the sub-object. */ - Object getObject(const std::string& name) const; + Object getObject(const std::string& name, bool allow_empty = false) const; /** * Get an array by name. @@ -116,6 +117,16 @@ class Object { json_t* json_; std::string name_; + +private: + struct EmptyObject { + EmptyObject(); + ~EmptyObject(); + + json_t* json_; + }; + + static EmptyObject empty_; }; /** diff --git a/source/common/router/retry_state_impl.cc b/source/common/router/retry_state_impl.cc index dc28f469b3c5..aab8a3c4bd1d 100644 --- a/source/common/router/retry_state_impl.cc +++ b/source/common/router/retry_state_impl.cc @@ -16,8 +16,10 @@ const uint32_t RetryPolicy::RETRY_ON_RETRIABLE_4XX; RetryStateImpl::RetryStateImpl(const RetryPolicy& route_policy, Http::HeaderMap& request_headers, const Upstream::Cluster& cluster, Runtime::Loader& runtime, - Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher) - : cluster_(cluster), runtime_(runtime), random_(random), dispatcher_(dispatcher) { + Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher, + Upstream::ResourcePriority priority) + : cluster_(cluster), runtime_(runtime), random_(random), dispatcher_(dispatcher), + priority_(priority) { retry_on_ = parseRetryOn(request_headers.get(Http::Headers::get().EnvoyRetryOn)); if (retry_on_ != 0) { const std::string& max_retries = request_headers.get(Http::Headers::get().EnvoyMaxRetries); @@ -71,7 +73,7 @@ uint32_t RetryStateImpl::parseRetryOn(const std::string& config) { void RetryStateImpl::resetRetry() { if (callback_) { - cluster_.resourceManager().retries().dec(); + cluster_.resourceManager(priority_).retries().dec(); callback_ = nullptr; } } @@ -100,14 +102,14 @@ bool RetryStateImpl::shouldRetry(const Http::HeaderMap* response_headers, return false; } - if (!cluster_.resourceManager().retries().canCreate()) { + if (!cluster_.resourceManager(priority_).retries().canCreate()) { cluster_.stats().upstream_rq_retry_overflow_.inc(); return false; } ASSERT(!callback_); callback_ = callback; - cluster_.resourceManager().retries().inc(); + cluster_.resourceManager(priority_).retries().inc(); cluster_.stats().upstream_rq_retry_.inc(); enableBackoffTimer(); return true; diff --git a/source/common/router/retry_state_impl.h b/source/common/router/retry_state_impl.h index c39630fb973c..6d8ec271ea43 100644 --- a/source/common/router/retry_state_impl.h +++ b/source/common/router/retry_state_impl.h @@ -17,7 +17,8 @@ class RetryStateImpl : public RetryState { public: RetryStateImpl(const RetryPolicy& route_policy, Http::HeaderMap& request_headers, const Upstream::Cluster& cluster, Runtime::Loader& runtime, - Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher); + Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher, + Upstream::ResourcePriority priority); ~RetryStateImpl(); static uint32_t parseRetryOn(const std::string& config); @@ -43,6 +44,7 @@ class RetryStateImpl : public RetryState { uint32_t current_retry_{}; DoRetryCallback callback_; Event::TimerPtr retry_timer_; + Upstream::ResourcePriority priority_; }; } // Router diff --git a/source/common/router/router.cc b/source/common/router/router.cc index 661f3ebeced9..07c8589b3dea 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -177,8 +177,9 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e } // Fetch a connection pool for the upstream cluster. - Http::ConnectionPool::Instance* conn_pool = - config_->cm_.httpConnPoolForCluster(route_->clusterName()); + // TODO: In follow up commit, specify priority. + Http::ConnectionPool::Instance* conn_pool = config_->cm_.httpConnPoolForCluster( + route_->clusterName(), Upstream::ResourcePriority::Default); if (!conn_pool) { sendNoHealthyUpstreamResponse(); return Http::FilterHeadersStatus::StopIteration; @@ -459,8 +460,9 @@ bool Filter::setupRetry(bool end_stream) { } void Filter::doRetry() { - Http::ConnectionPool::Instance* conn_pool = - config_->cm_.httpConnPoolForCluster(route_->clusterName()); + // TODO: In follow up commit, specify priority. + Http::ConnectionPool::Instance* conn_pool = config_->cm_.httpConnPoolForCluster( + route_->clusterName(), Upstream::ResourcePriority::Default); if (!conn_pool) { sendNoHealthyUpstreamResponse(); cleanup(); @@ -538,8 +540,9 @@ RetryStatePtr ProdFilter::createRetryState(const RetryPolicy& policy, Http::HeaderMap& request_headers, const Upstream::Cluster& cluster, Runtime::Loader& runtime, Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher) { - return RetryStatePtr{ - new RetryStateImpl(policy, request_headers, cluster, runtime, random, dispatcher)}; + // TODO: In follow up commit, specify priority. + return RetryStatePtr{new RetryStateImpl(policy, request_headers, cluster, runtime, random, + dispatcher, Upstream::ResourcePriority::Default)}; } } // Router diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 84c301601a70..d8a24b116dc7 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -7,6 +7,7 @@ #include "envoy/network/dns.h" #include "envoy/runtime/runtime.h" +#include "common/common/enum_to_int.h" #include "common/http/http1/conn_pool.h" #include "common/http/http2/conn_pool.h" #include "common/http/async_client_impl.h" @@ -139,7 +140,7 @@ const Cluster* ClusterManagerImpl::get(const std::string& cluster) { } Http::ConnectionPool::Instance* -ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster) { +ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster, ResourcePriority priority) { ThreadLocalClusterManagerImpl& cluster_manager = tls_.getTyped(thread_local_slot_); @@ -149,7 +150,7 @@ ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster) { throw EnvoyException(fmt::format("unknown cluster '{}'", cluster)); } - return entry->second->connPool(); + return entry->second->connPool(priority); } void ClusterManagerImpl::postThreadLocalClusterUpdate(const ClusterImplBase& primary_cluster, @@ -213,24 +214,45 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl cluster.second->host_set_.addMemberUpdateCb( [this](const std::vector&, const std::vector& hosts_removed) -> void { // We need to go through and purge any connection pools for hosts that got deleted. - // Right now hosts are specific to clusters, so even if two hosts actually point - // to the same address this will be safe. - + // Even if two hosts actually point to the same address this will be safe, since if a + // host is readded it will be a different physical HostPtr. for (const HostPtr& old_host : hosts_removed) { - // Set a drained callback on the connection pool. When it is fully drained, we will - // destroy it. - auto conn_pool = host_http_conn_pool_map_.find(old_host); - if (conn_pool != host_http_conn_pool_map_.end()) { - conn_pool->second->addDrainedCallback([this, old_host]() -> void { - dispatcher_.deferredDelete(std::move(host_http_conn_pool_map_[old_host])); - host_http_conn_pool_map_.erase(old_host); - }); + auto container = host_http_conn_pool_map_.find(old_host); + if (container != host_http_conn_pool_map_.end()) { + drainConnPools(old_host, container->second); } } }); } } +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools( + HostPtr old_host, ConnPoolsContainer& container) { + for (const Http::ConnectionPool::InstancePtr& pool : container.pools_) { + if (pool) { + container.drains_remaining_++; + } + } + + for (const Http::ConnectionPool::InstancePtr& pool : container.pools_) { + if (!pool) { + continue; + } + + pool->addDrainedCallback([this, old_host]() -> void { + ConnPoolsContainer& container = host_http_conn_pool_map_[old_host]; + ASSERT(container.drains_remaining_ > 0); + container.drains_remaining_--; + if (container.drains_remaining_ == 0) { + for (Http::ConnectionPool::InstancePtr& pool : container.pools_) { + dispatcher_.deferredDelete(std::move(pool)); + } + host_http_conn_pool_map_.erase(old_host); + } + }); + } +} + void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( const std::string& name, ConstHostVectorPtr hosts, ConstHostVectorPtr healthy_hosts, ConstHostVectorPtr local_zone_hosts, ConstHostVectorPtr local_zone_healthy_hosts, @@ -273,31 +295,34 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry( } Http::ConnectionPool::Instance* -ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool() { +ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( + ResourcePriority priority) { ConstHostPtr host = lb_->chooseHost(); if (!host) { primary_cluster_.stats().upstream_cx_none_healthy_.inc(); return nullptr; } - if (parent_.host_http_conn_pool_map_.find(host) == parent_.host_http_conn_pool_map_.end()) { - parent_.host_http_conn_pool_map_[host] = - parent_.parent_.allocateConnPool(parent_.dispatcher_, host, parent_.parent_.stats_); + ConnPoolsContainer& container = parent_.host_http_conn_pool_map_[host]; + ASSERT(enumToInt(priority) < container.pools_.size()); + if (!container.pools_[enumToInt(priority)]) { + container.pools_[enumToInt(priority)] = parent_.parent_.allocateConnPool( + parent_.dispatcher_, host, parent_.parent_.stats_, priority); } - return parent_.host_http_conn_pool_map_[host].get(); + return container.pools_[enumToInt(priority)].get(); } Http::ConnectionPool::InstancePtr ProdClusterManagerImpl::allocateConnPool(Event::Dispatcher& dispatcher, ConstHostPtr host, - Stats::Store& store) { + Stats::Store& store, ResourcePriority priority) { if ((host->cluster().features() & Cluster::Features::HTTP2) && runtime_.snapshot().featureEnabled("upstream.use_http2", 100)) { return Http::ConnectionPool::InstancePtr{ - new Http::Http2::ProdConnPoolImpl(dispatcher, host, store)}; + new Http::Http2::ProdConnPoolImpl(dispatcher, host, store, priority)}; } else { return Http::ConnectionPool::InstancePtr{ - new Http::Http1::ConnPoolImplProd(dispatcher, host, store)}; + new Http::Http1::ConnPoolImplProd(dispatcher, host, store, priority)}; } } diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 1dd4f682d779..8036914823c7 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -42,7 +42,8 @@ class ClusterManagerImpl : public ClusterManager { } const Cluster* get(const std::string& cluster) override; - Http::ConnectionPool::Instance* httpConnPoolForCluster(const std::string& cluster) override; + Http::ConnectionPool::Instance* httpConnPoolForCluster(const std::string& cluster, + ResourcePriority priority) override; Host::CreateConnectionData tcpConnForCluster(const std::string& cluster) override; Http::AsyncClient& httpAsyncClientForCluster(const std::string& cluster) override; @@ -62,6 +63,13 @@ class ClusterManagerImpl : public ClusterManager { * connection pools. */ struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject { + struct ConnPoolsContainer { + typedef std::array ConnPools; + + ConnPools pools_; + uint64_t drains_remaining_{}; + }; + struct ClusterEntry : public Http::AsyncClientConnPoolFactory { ClusterEntry(ThreadLocalClusterManagerImpl& parent, const Cluster& cluster, Runtime::Loader& runtime, Runtime::RandomGenerator& random, @@ -69,7 +77,7 @@ class ClusterManagerImpl : public ClusterManager { const std::string& local_zone_name); // Http::AsyncClientConnPoolFactory - Http::ConnectionPool::Instance* connPool() override; + Http::ConnectionPool::Instance* connPool(ResourcePriority priority) override; ThreadLocalClusterManagerImpl& parent_; HostSetImpl host_set_; @@ -83,7 +91,7 @@ class ClusterManagerImpl : public ClusterManager { ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, Runtime::Loader& runtime, Runtime::RandomGenerator& random, const std::string& local_zone_name); - + void drainConnPools(HostPtr old_host, ConnPoolsContainer& container); static void updateClusterMembership(const std::string& name, ConstHostVectorPtr hosts, ConstHostVectorPtr healthy_hosts, ConstHostVectorPtr local_zone_hosts, @@ -98,11 +106,12 @@ class ClusterManagerImpl : public ClusterManager { ClusterManagerImpl& parent_; Event::Dispatcher& dispatcher_; std::unordered_map thread_local_clusters_; - std::unordered_map host_http_conn_pool_map_; + std::unordered_map host_http_conn_pool_map_; }; - virtual Http::ConnectionPool::InstancePtr - allocateConnPool(Event::Dispatcher& dispatcher, ConstHostPtr host, Stats::Store& store) PURE; + virtual Http::ConnectionPool::InstancePtr allocateConnPool(Event::Dispatcher& dispatcher, + ConstHostPtr host, Stats::Store& store, + ResourcePriority priority) PURE; void loadCluster(const Json::Object& cluster, Stats::Store& stats, Network::DnsResolver& dns_resolver, Ssl::ContextManager& ssl_context_manager, Runtime::Loader& runtime, Runtime::RandomGenerator& random); @@ -129,8 +138,9 @@ class ProdClusterManagerImpl : public ClusterManagerImpl { private: // ClusterManagerImpl - Http::ConnectionPool::InstancePtr - allocateConnPool(Event::Dispatcher& dispatcher, ConstHostPtr host, Stats::Store& store) override; + Http::ConnectionPool::InstancePtr allocateConnPool(Event::Dispatcher& dispatcher, + ConstHostPtr host, Stats::Store& store, + ResourcePriority priority) override; }; } // Upstream diff --git a/source/common/upstream/resource_manager_impl.h b/source/common/upstream/resource_manager_impl.h index cf91a578a295..ae1bbf845e18 100644 --- a/source/common/upstream/resource_manager_impl.h +++ b/source/common/upstream/resource_manager_impl.h @@ -52,4 +52,6 @@ class ResourceManagerImpl : public ResourceManager { ResourceImpl retries_; }; +typedef std::unique_ptr ResourceManagerImplPtr; + } // Upstream diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index f15330670e62..6a0e267672d1 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -6,6 +6,7 @@ #include "envoy/ssl/context.h" #include "envoy/upstream/health_checker.h" +#include "common/common/enum_to_int.h" #include "common/common/utility.h" #include "common/http/utility.h" #include "common/json/json_loader.h" @@ -57,12 +58,8 @@ ClusterImplBase::ClusterImplBase(const Json::Object& config, Stats::Store& stats max_requests_per_connection_(config.getInteger("max_requests_per_connection", 0)), connect_timeout_(std::chrono::milliseconds(config.getInteger("connect_timeout_ms"))), stats_(generateStats(name_, stats)), alt_stat_name_(config.getString("alt_stat_name", "")), - resource_manager_(config.getInteger("max_connections", 1024), - config.getInteger("max_pending_requests", 1024), - config.getInteger("max_requests", 1024), - config.getInteger("max_retries", 3)), features_(parseFeatures(config)), - http_codec_options_(Http::Utility::parseCodecOptions(config)) { + http_codec_options_(Http::Utility::parseCodecOptions(config)), resource_managers_(config) { std::string string_lb_type = config.getString("lb_type"); if (string_lb_type == "round_robin") { @@ -83,8 +80,6 @@ ClusterImplBase::ClusterImplBase(const Json::Object& config, Stats::Store& stats } } -ClusterImplBase::~ClusterImplBase() {} - ConstHostVectorPtr ClusterImplBase::createHealthyHostList(const std::vector& hosts) { HostVectorPtr healthy_list(new std::vector()); for (auto host : hosts) { @@ -109,6 +104,11 @@ uint64_t ClusterImplBase::parseFeatures(const Json::Object& config) { return features; } +ResourceManager& ClusterImplBase::resourceManager(ResourcePriority priority) const { + ASSERT(enumToInt(priority) < resource_managers_.managers_.size()); + return *resource_managers_.managers_[enumToInt(priority)]; +} + void ClusterImplBase::runUpdateCallbacks(const std::vector& hosts_added, const std::vector& hosts_removed) { if (!hosts_added.empty() || !hosts_removed.empty()) { @@ -133,6 +133,28 @@ void ClusterImplBase::setHealthChecker(HealthCheckerPtr&& health_checker) { }); } +ClusterImplBase::ResourceManagers::ResourceManagers(const Json::Object& config) { + managers_[enumToInt(ResourcePriority::Default)] = load(config, "default"); + managers_[enumToInt(ResourcePriority::High)] = load(config, "high"); +} + +ResourceManagerImplPtr ClusterImplBase::ResourceManagers::load(const Json::Object& config, + const std::string& priority) { + uint64_t max_connections = 1024; + uint64_t max_pending_requests = 1024; + uint64_t max_requests = 1024; + uint64_t max_retries = 3; + + Json::Object settings = config.getObject("circuit_breakers", true).getObject(priority, true); + max_connections = settings.getInteger("max_connections", max_connections); + max_pending_requests = settings.getInteger("max_pending_requests", max_pending_requests); + max_requests = settings.getInteger("max_requests", max_requests); + max_retries = settings.getInteger("max_retries", max_retries); + + return ResourceManagerImplPtr{ + new ResourceManagerImpl(max_connections, max_pending_requests, max_requests, max_retries)}; +} + StaticClusterImpl::StaticClusterImpl(const Json::Object& config, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager) : ClusterImplBase(config, stats, ssl_context_manager) { diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index c37f92675b6f..d1d219d17315 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -155,13 +155,12 @@ class ClusterImplBase : public Cluster, LoadBalancerType lbType() const override { return lb_type_; } uint64_t maxRequestsPerConnection() const override { return max_requests_per_connection_; } const std::string& name() const override { return name_; } - ResourceManager& resourceManager() const override { return resource_manager_; } + ResourceManager& resourceManager(ResourcePriority priority) const override; ClusterStats& stats() const override { return stats_; } protected: ClusterImplBase(const Json::Object& config, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager); - ~ClusterImplBase(); static ConstHostVectorPtr createHealthyHostList(const std::vector& hosts); void runUpdateCallbacks(const std::vector& hosts_added, @@ -177,13 +176,22 @@ class ClusterImplBase : public Cluster, mutable ClusterStats stats_; HealthCheckerPtr health_checker_; std::string alt_stat_name_; - mutable ResourceManagerImpl resource_manager_; uint64_t features_; private: + struct ResourceManagers { + ResourceManagers(const Json::Object& config); + ResourceManagerImplPtr load(const Json::Object& config, const std::string& priority); + + typedef std::array Managers; + + Managers managers_; + }; + uint64_t parseFeatures(const Json::Object& config); const uint64_t http_codec_options_; + mutable ResourceManagers resource_managers_; }; typedef std::shared_ptr ClusterImplBasePtr; diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index cbc7c9ea0526..4636d52822d1 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -25,7 +25,9 @@ class AsyncClientImplTest : public testing::Test, public AsyncClientConnPoolFact } // Http::AsyncClientConnPoolFactory - Http::ConnectionPool::Instance* connPool() override { return &conn_pool_; } + Http::ConnectionPool::Instance* connPool(Upstream::ResourcePriority) override { + return &conn_pool_; + } std::string upstream_zone_{"to_az"}; MessagePtr message_{new RequestMessageImpl()}; diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index 55ebb5fcd1ba..0470a86d54ba 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -30,7 +30,7 @@ class ConnPoolImplForTest : public ConnPoolImpl { ConnPoolImplForTest(Event::MockDispatcher& dispatcher, Upstream::Cluster& cluster) : ConnPoolImpl(dispatcher, Upstream::HostPtr{new Upstream::HostImpl( cluster, "tcp://127.0.0.1:9000", false, 1, "")}, - stats_store_), + stats_store_, Upstream::ResourcePriority::Default), mock_dispatcher_(dispatcher) {} ~ConnPoolImplForTest() { diff --git a/test/common/http/http2/conn_pool_test.cc b/test/common/http/http2/conn_pool_test.cc index f9e80bdffe2f..645fdcbd5044 100644 --- a/test/common/http/http2/conn_pool_test.cc +++ b/test/common/http/http2/conn_pool_test.cc @@ -49,7 +49,8 @@ class Http2ConnPoolImplTest : public testing::Test { Event::MockTimer* connect_timer_; }; - Http2ConnPoolImplTest() : pool_(dispatcher_, host_, cluster_.stats_store_) {} + Http2ConnPoolImplTest() + : pool_(dispatcher_, host_, cluster_.stats_store_, Upstream::ResourcePriority::Default) {} ~Http2ConnPoolImplTest() { // Make sure all gauges are 0. diff --git a/test/common/json/json_loader_test.cc b/test/common/json/json_loader_test.cc index fc424f7149d0..f625801d724c 100644 --- a/test/common/json/json_loader_test.cc +++ b/test/common/json/json_loader_test.cc @@ -103,6 +103,17 @@ TEST(JsonLoaderTest, Basic) { std::vector array = config.getObjectArray("descriptors"); EXPECT_THROW(array[0].asObjectArray(), Exception); } + + { + std::string json = R"EOF( + { + } + )EOF"; + + Json::StringLoader config(json); + Object object = config.getObject("foo", true); + EXPECT_EQ(2, object.getInteger("bar", 2)); + } } TEST(JsonLoaderTest, Integer) { diff --git a/test/common/router/retry_state_impl_test.cc b/test/common/router/retry_state_impl_test.cc index 5c12711e2b20..ffc4f294771d 100644 --- a/test/common/router/retry_state_impl_test.cc +++ b/test/common/router/retry_state_impl_test.cc @@ -20,8 +20,8 @@ class RouterRetryStateImplTest : public testing::Test { } void setup(Http::HeaderMap& request_headers) { - state_.reset( - new RetryStateImpl{policy_, request_headers, cluster_, runtime_, random_, dispatcher_}); + state_.reset(new RetryStateImpl{policy_, request_headers, cluster_, runtime_, random_, + dispatcher_, Upstream::ResourcePriority::Default}); } void expectTimerCreateAndEnable() { diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index 0aa5f78a8ec1..b980fc93895d 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -122,7 +122,7 @@ TEST_F(RouterTest, CancelBeforeBoundToPool) { } TEST_F(RouterTest, NoHost) { - EXPECT_CALL(cm_, httpConnPoolForCluster(_)).WillOnce(Return(nullptr)); + EXPECT_CALL(cm_, httpConnPoolForCluster(_, _)).WillOnce(Return(nullptr)); Http::HeaderMapImpl response_headers{ {":status", "503"}, {"content-length", "19"}, {"content-type", "text/plain"}}; @@ -294,7 +294,7 @@ TEST_F(RouterTest, RetryNoneHealthy) { router_.retry_state_->expectRetry(); encoder1.stream_.resetStream(Http::StreamResetReason::LocalReset); - EXPECT_CALL(cm_, httpConnPoolForCluster(_)).WillOnce(Return(nullptr)); + EXPECT_CALL(cm_, httpConnPoolForCluster(_, _)).WillOnce(Return(nullptr)); Http::HeaderMapImpl response_headers{ {":status", "503"}, {"content-length", "19"}, {"content-type", "text/plain"}}; EXPECT_CALL(callbacks_, encodeHeaders_(HeaderMapEqualRef(response_headers), false)); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 0bc5694e7ac1..df6cfd9d920d 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -22,7 +22,7 @@ class ClusterManagerImplForTest : public ClusterManagerImpl { using ClusterManagerImpl::ClusterManagerImpl; Http::ConnectionPool::InstancePtr allocateConnPool(Event::Dispatcher&, ConstHostPtr host, - Stats::Store&) override { + Stats::Store&, ResourcePriority) override { return Http::ConnectionPool::InstancePtr{allocateConnPool_(host)}; } @@ -177,7 +177,8 @@ TEST_F(ClusterManagerImplTest, UnknownCluster) { Json::StringLoader loader(json); create(loader); EXPECT_EQ(nullptr, cluster_manager_->get("hello")); - EXPECT_THROW(cluster_manager_->httpConnPoolForCluster("hello"), EnvoyException); + EXPECT_THROW(cluster_manager_->httpConnPoolForCluster("hello", ResourcePriority::Default), + EnvoyException); EXPECT_THROW(cluster_manager_->tcpConnForCluster("hello"), EnvoyException); EXPECT_THROW(cluster_manager_->httpAsyncClientForCluster("hello"), EnvoyException); } @@ -204,7 +205,8 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { create(loader); // Test for no hosts returning the correct values before we have hosts. - EXPECT_EQ(nullptr, cluster_manager_->httpConnPoolForCluster("cluster_1")); + EXPECT_EQ(nullptr, + cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::Default)); EXPECT_EQ(nullptr, cluster_manager_->tcpConnForCluster("cluster_1").connection_); EXPECT_EQ(2UL, stats_.counter("cluster.cluster_1.upstream_cx_none_healthy").value()); @@ -221,30 +223,44 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); EXPECT_CALL(*cluster_manager_, allocateConnPool_(_)) - .Times(2) + .Times(4) .WillRepeatedly(ReturnNew()); // This should provide us a CP for each of the above hosts. Http::ConnectionPool::MockInstance* cp1 = dynamic_cast( - cluster_manager_->httpConnPoolForCluster("cluster_1")); + cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::Default)); Http::ConnectionPool::MockInstance* cp2 = dynamic_cast( - cluster_manager_->httpConnPoolForCluster("cluster_1")); + cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::Default)); + Http::ConnectionPool::MockInstance* cp1_high = dynamic_cast( + cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::High)); + Http::ConnectionPool::MockInstance* cp2_high = dynamic_cast( + cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::High)); EXPECT_NE(cp1, cp2); + EXPECT_NE(cp1_high, cp2_high); + EXPECT_NE(cp1, cp1_high); Http::ConnectionPool::Instance::DrainedCb drained_cb; EXPECT_CALL(*cp1, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb)); + Http::ConnectionPool::Instance::DrainedCb drained_cb_high; + EXPECT_CALL(*cp1_high, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb_high)); // Remove the first host, this should lead to the first cp being drained. dns_timer_->callback_(); dns_callback({"127.0.0.2"}); drained_cb(); drained_cb = nullptr; + EXPECT_CALL(tls_.dispatcher_, deferredDelete_(_)).Times(2); + drained_cb_high(); + drained_cb_high = nullptr; // Make sure we get back the same connection pool for the 2nd host as we did before the change. Http::ConnectionPool::MockInstance* cp3 = dynamic_cast( - cluster_manager_->httpConnPoolForCluster("cluster_1")); + cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::Default)); + Http::ConnectionPool::MockInstance* cp3_high = dynamic_cast( + cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::High)); EXPECT_EQ(cp2, cp3); + EXPECT_EQ(cp2_high, cp3_high); // Now add and remove a host that we never have a conn pool to. This should not lead to any // drain callbacks, etc. diff --git a/test/common/upstream/upstream_impl_test.cc b/test/common/upstream/upstream_impl_test.cc index 516d9144525f..6f0d8a43d949 100644 --- a/test/common/upstream/upstream_impl_test.cc +++ b/test/common/upstream/upstream_impl_test.cc @@ -57,9 +57,20 @@ TEST(StrictDnsClusterImplTest, Basic) { "connect_timeout_ms": 250, "type": "strict_dns", "lb_type": "round_robin", - "max_connections": 43, - "max_pending_requests": 57, - "max_requests": 50, + "circuit_breakers": { + "default": { + "max_connections": 43, + "max_pending_requests": 57, + "max_requests": 50, + "max_retries": 10 + }, + "high": { + "max_connections": 1, + "max_pending_requests": 2, + "max_requests": 3, + "max_retries": 4 + } + }, "max_requests_per_connection": 3, "http_codec_options": "no_compression", "hosts": [{"url": "tcp://localhost:11001"}, @@ -69,9 +80,14 @@ TEST(StrictDnsClusterImplTest, Basic) { Json::StringLoader loader(json); StrictDnsClusterImpl cluster(loader, stats, ssl_context_manager, dns_resolver); - EXPECT_EQ(43U, cluster.resourceManager().connections().max()); - EXPECT_EQ(57U, cluster.resourceManager().pendingRequests().max()); - EXPECT_EQ(50U, cluster.resourceManager().requests().max()); + EXPECT_EQ(43U, cluster.resourceManager(ResourcePriority::Default).connections().max()); + EXPECT_EQ(57U, cluster.resourceManager(ResourcePriority::Default).pendingRequests().max()); + EXPECT_EQ(50U, cluster.resourceManager(ResourcePriority::Default).requests().max()); + EXPECT_EQ(10U, cluster.resourceManager(ResourcePriority::Default).retries().max()); + EXPECT_EQ(1U, cluster.resourceManager(ResourcePriority::High).connections().max()); + EXPECT_EQ(2U, cluster.resourceManager(ResourcePriority::High).pendingRequests().max()); + EXPECT_EQ(3U, cluster.resourceManager(ResourcePriority::High).requests().max()); + EXPECT_EQ(4U, cluster.resourceManager(ResourcePriority::High).retries().max()); EXPECT_EQ(3U, cluster.maxRequestsPerConnection()); EXPECT_EQ(Http::CodecOptions::NoCompression, cluster.httpCodecOptions()); ReadyWatcher membership_updated; @@ -183,9 +199,14 @@ TEST(StaticClusterImplTest, UrlConfig) { Json::StringLoader config(json); StaticClusterImpl cluster(config, stats, ssl_context_manager); - EXPECT_EQ(1024U, cluster.resourceManager().connections().max()); - EXPECT_EQ(1024U, cluster.resourceManager().pendingRequests().max()); - EXPECT_EQ(1024U, cluster.resourceManager().requests().max()); + EXPECT_EQ(1024U, cluster.resourceManager(ResourcePriority::Default).connections().max()); + EXPECT_EQ(1024U, cluster.resourceManager(ResourcePriority::Default).pendingRequests().max()); + EXPECT_EQ(1024U, cluster.resourceManager(ResourcePriority::Default).requests().max()); + EXPECT_EQ(3U, cluster.resourceManager(ResourcePriority::Default).retries().max()); + EXPECT_EQ(1024U, cluster.resourceManager(ResourcePriority::High).connections().max()); + EXPECT_EQ(1024U, cluster.resourceManager(ResourcePriority::High).pendingRequests().max()); + EXPECT_EQ(1024U, cluster.resourceManager(ResourcePriority::High).requests().max()); + EXPECT_EQ(3U, cluster.resourceManager(ResourcePriority::High).retries().max()); EXPECT_EQ(0U, cluster.maxRequestsPerConnection()); EXPECT_EQ(0U, cluster.httpCodecOptions()); EXPECT_EQ(LoadBalancerType::Random, cluster.lbType()); @@ -205,7 +226,6 @@ TEST(StaticClusterImplTest, BothAddressPortAndURLConfig) { "connect_timeout_ms": 250, "type": "static", "lb_type": "round_robin", - "max_connections": 43, "hosts": [{"address": "1.2.3.4", "port": 99, "url": "tcp://192.168.1.1:22"}, {"address":"5.6.7.8", "port": 63, "url": "tcp://192.168.1.2:44"}] } @@ -226,7 +246,6 @@ TEST(StaticClusterImplTest, AddressMissingPortConfig) { "connect_timeout_ms": 250, "type": "static", "lb_type": "round_robin", - "max_connections": 43, "hosts": [{"address": "1.2.3.4"}, {"address":"5.6.7.8"}] } @@ -245,7 +264,6 @@ TEST(StaticClusterImplTest, UnsupportedLBType) { "connect_timeout_ms": 250, "type": "static", "lb_type": "fakelbtype", - "max_connections": 43, "hosts": [{"address": "1.2.3.4", "port": 99, "url": "tcp://192.168.1.1:22"}, {"address":"5.6.7.8", "port": 63, "url": "tcp://192.168.1.2:44"}] } @@ -265,7 +283,6 @@ TEST(StaticClusterImplTest, UnsupportedFeature) { "type": "static", "lb_type": "round_robin", "features": "fake", - "max_connections": 43, "hosts": [{"address": "1.2.3.4", "port": 99, "url": "tcp://192.168.1.1:22"}, {"address":"5.6.7.8", "port": 63, "url": "tcp://192.168.1.2:44"}] } diff --git a/test/mocks/upstream/mocks.cc b/test/mocks/upstream/mocks.cc index e67698ea4dd5..bf0ff5b2a440 100644 --- a/test/mocks/upstream/mocks.cc +++ b/test/mocks/upstream/mocks.cc @@ -33,14 +33,15 @@ MockCluster::MockCluster() ON_CALL(*this, maxRequestsPerConnection()) .WillByDefault(ReturnPointee(&max_requests_per_connection_)); ON_CALL(*this, stats()).WillByDefault(ReturnRef(stats_)); - ON_CALL(*this, resourceManager()) - .WillByDefault(Invoke([this]() -> Upstream::ResourceManager& { return *resource_manager_; })); + ON_CALL(*this, resourceManager(_)) + .WillByDefault(Invoke([this](ResourcePriority) + -> Upstream::ResourceManager& { return *resource_manager_; })); } MockCluster::~MockCluster() {} MockClusterManager::MockClusterManager() { - ON_CALL(*this, httpConnPoolForCluster(_)).WillByDefault(Return(&conn_pool_)); + ON_CALL(*this, httpConnPoolForCluster(_, _)).WillByDefault(Return(&conn_pool_)); ON_CALL(*this, get(_)).WillByDefault(Return(&cluster_)); } diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index a016feae258a..98d6f4e81072 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -41,7 +41,7 @@ class MockCluster : public Cluster { MOCK_CONST_METHOD0(lbType, LoadBalancerType()); MOCK_CONST_METHOD0(maxRequestsPerConnection, uint64_t()); MOCK_CONST_METHOD0(name, const std::string&()); - MOCK_CONST_METHOD0(resourceManager, ResourceManager&()); + MOCK_CONST_METHOD1(resourceManager, ResourceManager&(ResourcePriority priority)); MOCK_METHOD0(shutdown, void()); MOCK_CONST_METHOD0(stats, ClusterStats&()); @@ -70,7 +70,8 @@ class MockClusterManager : public ClusterManager { MOCK_METHOD1(setInitializedCb, void(std::function)); MOCK_METHOD0(clusters, std::unordered_map()); MOCK_METHOD1(get, const Cluster*(const std::string& cluster)); - MOCK_METHOD1(httpConnPoolForCluster, Http::ConnectionPool::Instance*(const std::string& cluster)); + MOCK_METHOD2(httpConnPoolForCluster, Http::ConnectionPool::Instance*(const std::string& cluster, + ResourcePriority priority)); MOCK_METHOD1(tcpConnForCluster_, MockHost::MockCreateConnectionData(const std::string& cluster)); MOCK_METHOD1(httpAsyncClientForCluster, Http::AsyncClient&(const std::string& cluster)); MOCK_METHOD0(shutdown, void());