Skip to content

Commit

Permalink
Allow circuit breaker settings to be runtime controllable (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
junr03 authored Sep 29, 2016
1 parent add5778 commit f095ab5
Show file tree
Hide file tree
Showing 22 changed files with 159 additions and 52 deletions.
2 changes: 2 additions & 0 deletions docs/configuration/cluster_manager/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Cluster
"alt_stat_name": "..."
}
.. _config_cluster_manager_cluster_name:

name
*(required, string)* Supplies the name of the cluster which must be unique across all clusters.
The cluster name is used when emitting :ref:`statistics <config_cluster_manager_cluster_stats>`.
Expand Down
15 changes: 15 additions & 0 deletions docs/configuration/cluster_manager/cluster_circuit_breakers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,32 @@ max_connections
cluster. If not specified, the default is 1024. See the :ref:`circuit breaking overview
<arch_overview_circuit_break>` for more information.

.. _config_cluster_manager_cluster_circuit_breakers_max_pending_requests:

max_pending_requests
*(optional, integer)* The maximum number of pending requests that Envoy will allow to the upstream
cluster. If not specified, the default is 1024. See the :ref:`circuit breaking overview
<arch_overview_circuit_break>` for more information.

.. _config_cluster_manager_cluster_circuit_breakers_max_requests:

max_requests
*(optional, integer)* The maximum number of parallel requests that Envoy will make to the upstream
cluster. If not specified, the default is 1024. See the :ref:`circuit breaking overview
<arch_overview_circuit_break>` for more information.

.. _config_cluster_manager_cluster_circuit_breakers_max_retries:

max_retries
*(optional, integer)* The maximum number of parallel retries that Envoy will allow to the upstream
cluster. If not specified, the default is 3. See the :ref:`circuit breaking overview
<arch_overview_circuit_break>` for more information.

Runtime
-------

All four circuit breaking settings are runtime configurable for all defined priorities based on cluster
name. They follow the following naming scheme ``circuit_breakers.<cluster_name>.<priority>.<setting>``.
``cluster_name`` is the name field in each cluster's configuration, which is set in the envoy
:ref:`config file <config_cluster_manager_cluster_name>`. Available runtime settings will override
settings set in the envoy config file.
12 changes: 12 additions & 0 deletions docs/configuration/cluster_manager/cluster_runtime.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,15 @@ upstream.zone_routing.healthy_panic_threshold
Defines the :ref:`zone healthy panic threshold <arch_overview_load_balancing_zone_panic_threshold>`
percentage. Defaults to 80%. If the % of healthy hosts in the current zone falls below this %
all healthy hosts will be used for routing.

circuit_breakers.<cluster_name>.<priority>.max_connections
:ref:`Max connections circuit breaker setting <config_cluster_manager_cluster_circuit_breakers_max_connections>`

circuit_breakers.<cluster_name>.<priority>.max_pending_requests
:ref:`Max pending requests circuit breaker setting <config_cluster_manager_cluster_circuit_breakers_max_pending_requests>`

circuit_breakers.<cluster_name>.<priority>.max_requests
:ref:`Max requests circuit breaker setting <config_cluster_manager_cluster_circuit_breakers_max_requests>`

circuit_breakers.<cluster_name>.<priority>.max_retries
:ref:`Max retries circuit breaker setting <config_cluster_manager_cluster_circuit_breakers_max_retries>`
9 changes: 5 additions & 4 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,19 @@ void ClusterManagerImpl::loadCluster(const Json::Object& cluster, Stats::Store&
std::string string_type = cluster.getString("type");
ClusterImplBasePtr new_cluster;
if (string_type == "static") {
new_cluster.reset(new StaticClusterImpl(cluster, stats, ssl_context_manager));
new_cluster.reset(new StaticClusterImpl(cluster, runtime, stats, ssl_context_manager));
} else if (string_type == "strict_dns") {
new_cluster.reset(new StrictDnsClusterImpl(cluster, stats, ssl_context_manager, dns_resolver));
new_cluster.reset(
new StrictDnsClusterImpl(cluster, runtime, stats, ssl_context_manager, dns_resolver));
} else if (string_type == "logical_dns") {
new_cluster.reset(
new LogicalDnsCluster(cluster, stats, ssl_context_manager, dns_resolver, tls_));
new LogicalDnsCluster(cluster, runtime, stats, ssl_context_manager, dns_resolver, tls_));
} else if (string_type == "sds") {
if (!sds_config_.valid()) {
throw EnvoyException("cannot create an sds cluster without an sds config");
}

sds_clusters_.push_back(new SdsClusterImpl(cluster, stats, ssl_context_manager,
sds_clusters_.push_back(new SdsClusterImpl(cluster, runtime, stats, ssl_context_manager,
sds_config_.value(), *this,
dns_resolver.dispatcher(), random));
new_cluster.reset(sds_clusters_.back());
Expand Down
8 changes: 4 additions & 4 deletions source/common/upstream/logical_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

namespace Upstream {

LogicalDnsCluster::LogicalDnsCluster(const Json::Object& config, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager,
LogicalDnsCluster::LogicalDnsCluster(const Json::Object& config, Runtime::Loader& runtime,
Stats::Store& stats, Ssl::ContextManager& ssl_context_manager,
Network::DnsResolver& dns_resolver, ThreadLocal::Instance& tls)
: ClusterImplBase(config, stats, ssl_context_manager), dns_resolver_(dns_resolver), tls_(tls),
tls_slot_(tls.allocateSlot()),
: ClusterImplBase(config, runtime, stats, ssl_context_manager), dns_resolver_(dns_resolver),
tls_(tls), tls_slot_(tls.allocateSlot()),
resolve_timer_(dns_resolver.dispatcher().createTimer([this]() -> void { startResolve(); })) {

std::vector<Json::Object> hosts_json = config.getObjectArray("hosts");
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/logical_dns_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace Upstream {
*/
class LogicalDnsCluster : public ClusterImplBase {
public:
LogicalDnsCluster(const Json::Object& config, Stats::Store& stats,
LogicalDnsCluster(const Json::Object& config, Runtime::Loader& runtime, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager, Network::DnsResolver& dns_resolver,
ThreadLocal::Instance& tls);

Expand Down
19 changes: 13 additions & 6 deletions source/common/upstream/resource_manager_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "envoy/runtime/runtime.h"
#include "envoy/upstream/resource_manager.h"

#include "common/common/assert.h"
Expand All @@ -17,10 +18,13 @@ namespace Upstream {
*/
class ResourceManagerImpl : public ResourceManager {
public:
ResourceManagerImpl(uint64_t max_connections, uint64_t max_pending_requests,
ResourceManagerImpl(Runtime::Loader& runtime, const std::string& runtime_key,
uint64_t max_connections, uint64_t max_pending_requests,
uint64_t max_requests, uint64_t max_retries)
: connections_(max_connections), pending_requests_(max_pending_requests),
requests_(max_requests), retries_(max_retries) {}
: connections_(max_connections, runtime, runtime_key + "max_connections"),
pending_requests_(max_pending_requests, runtime, runtime_key + "max_pending_requests"),
requests_(max_requests, runtime, runtime_key + "max_requests"),
retries_(max_retries, runtime, runtime_key + "max_retries") {}

// Upstream::ResourceManager
Resource& connections() override { return connections_; }
Expand All @@ -30,20 +34,23 @@ class ResourceManagerImpl : public ResourceManager {

private:
struct ResourceImpl : public Resource {
ResourceImpl(uint64_t max) : max_(max) {}
ResourceImpl(uint64_t max, Runtime::Loader& runtime, const std::string& runtime_key)
: max_(max), runtime_(runtime), runtime_key_(runtime_key) {}
~ResourceImpl() { ASSERT(current_ == 0); }

// Upstream::Resource
bool canCreate() override { return current_ < max_; }
bool canCreate() override { return current_ < max(); }
void inc() override { current_++; }
void dec() override {
ASSERT(current_ > 0);
current_--;
}
uint64_t max() override { return max_; }
uint64_t max() override { return runtime_.snapshot().getInteger(runtime_key_, max_); }

const uint64_t max_;
std::atomic<uint64_t> current_{};
Runtime::Loader& runtime_;
std::string runtime_key_;
};

ResourceImpl connections_;
Expand Down
8 changes: 4 additions & 4 deletions source/common/upstream/sds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

namespace Upstream {

SdsClusterImpl::SdsClusterImpl(const Json::Object& config, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager,
SdsClusterImpl::SdsClusterImpl(const Json::Object& config, Runtime::Loader& runtime,
Stats::Store& stats, Ssl::ContextManager& ssl_context_manager,
const SdsConfig& sds_config, ClusterManager& cm,
Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random)
: BaseDynamicClusterImpl(config, stats, ssl_context_manager), cm_(cm), sds_config_(sds_config),
service_name_(config.getString("service_name")), random_(random),
: BaseDynamicClusterImpl(config, runtime, stats, ssl_context_manager), cm_(cm),
sds_config_(sds_config), service_name_(config.getString("service_name")), random_(random),
refresh_timer_(dispatcher.createTimer([this]() -> void { refreshHosts(); })) {}

SdsClusterImpl::~SdsClusterImpl() {}
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/sds.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct SdsConfig {
*/
class SdsClusterImpl : public BaseDynamicClusterImpl, public Http::AsyncClient::Callbacks {
public:
SdsClusterImpl(const Json::Object& config, Stats::Store& stats,
SdsClusterImpl(const Json::Object& config, Runtime::Loader& runtime, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager, const SdsConfig& sds_config,
ClusterManager& cm, Event::Dispatcher& dispatcher,
Runtime::RandomGenerator& random);
Expand Down
34 changes: 21 additions & 13 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ void HostSetImpl::runUpdateCallbacks(const std::vector<HostPtr>& hosts_added,

const ConstHostVectorPtr ClusterImplBase::empty_host_list_{new std::vector<HostPtr>{}};

ClusterImplBase::ClusterImplBase(const Json::Object& config, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager)
ClusterImplBase::ClusterImplBase(const Json::Object& config, Runtime::Loader& runtime,
Stats::Store& stats, Ssl::ContextManager& ssl_context_manager)
: name_(config.getString("name")),
max_requests_per_connection_(config.getInteger("max_requests_per_connection", 0)),
connect_timeout_(std::chrono::milliseconds(config.getInteger("connect_timeout_ms"))),
stats_(generateStats(name_, stats)), alt_stat_name_(config.getString("alt_stat_name", "")),
features_(parseFeatures(config)),
http_codec_options_(Http::Utility::parseCodecOptions(config)), resource_managers_(config) {
http_codec_options_(Http::Utility::parseCodecOptions(config)),
resource_managers_(config, runtime, name_) {

std::string string_lb_type = config.getString("lb_type");
if (string_lb_type == "round_robin") {
Expand Down Expand Up @@ -133,31 +134,36 @@ void ClusterImplBase::setHealthChecker(HealthCheckerPtr&& health_checker) {
});
}

ClusterImplBase::ResourceManagers::ResourceManagers(const Json::Object& config) {
managers_[enumToInt(ResourcePriority::Default)] = load(config, "default");
managers_[enumToInt(ResourcePriority::High)] = load(config, "high");
ClusterImplBase::ResourceManagers::ResourceManagers(const Json::Object& config,
Runtime::Loader& runtime,
const std::string& cluster_name) {
managers_[enumToInt(ResourcePriority::Default)] = load(config, runtime, cluster_name, "default");
managers_[enumToInt(ResourcePriority::High)] = load(config, runtime, cluster_name, "high");
}

ResourceManagerImplPtr ClusterImplBase::ResourceManagers::load(const Json::Object& config,
Runtime::Loader& runtime,
const std::string& cluster_name,
const std::string& priority) {
uint64_t max_connections = 1024;
uint64_t max_pending_requests = 1024;
uint64_t max_requests = 1024;
uint64_t max_retries = 3;
std::string runtime_prefix = fmt::format("circuit_breakers.{}.{}.", cluster_name, priority);

Json::Object settings = config.getObject("circuit_breakers", true).getObject(priority, true);
max_connections = settings.getInteger("max_connections", max_connections);
max_pending_requests = settings.getInteger("max_pending_requests", max_pending_requests);
max_requests = settings.getInteger("max_requests", max_requests);
max_retries = settings.getInteger("max_retries", max_retries);

return ResourceManagerImplPtr{
new ResourceManagerImpl(max_connections, max_pending_requests, max_requests, max_retries)};
return ResourceManagerImplPtr{new ResourceManagerImpl(
runtime, runtime_prefix, max_connections, max_pending_requests, max_requests, max_retries)};
}

StaticClusterImpl::StaticClusterImpl(const Json::Object& config, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager)
: ClusterImplBase(config, stats, ssl_context_manager) {
StaticClusterImpl::StaticClusterImpl(const Json::Object& config, Runtime::Loader& runtime,
Stats::Store& stats, Ssl::ContextManager& ssl_context_manager)
: ClusterImplBase(config, runtime, stats, ssl_context_manager) {
std::vector<Json::Object> hosts_json = config.getObjectArray("hosts");
HostVectorPtr new_hosts(new std::vector<HostPtr>());
for (Json::Object& host : hosts_json) {
Expand Down Expand Up @@ -249,10 +255,12 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
}
}

StrictDnsClusterImpl::StrictDnsClusterImpl(const Json::Object& config, Stats::Store& stats,
StrictDnsClusterImpl::StrictDnsClusterImpl(const Json::Object& config, Runtime::Loader& runtime,
Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager,
Network::DnsResolver& dns_resolver)
: BaseDynamicClusterImpl(config, stats, ssl_context_manager), dns_resolver_(dns_resolver) {
: BaseDynamicClusterImpl(config, runtime, stats, ssl_context_manager),
dns_resolver_(dns_resolver) {
for (Json::Object& host : config.getObjectArray("hosts")) {
resolve_targets_.emplace_back(new ResolveTarget(*this, host.getString("url")));
}
Expand Down
13 changes: 8 additions & 5 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "envoy/event/timer.h"
#include "envoy/network/dns.h"
#include "envoy/runtime/runtime.h"
#include "envoy/ssl/context_manager.h"
#include "envoy/upstream/health_checker.h"
#include "envoy/upstream/load_balancer.h"
Expand Down Expand Up @@ -159,7 +160,7 @@ class ClusterImplBase : public Cluster,
ClusterStats& stats() const override { return stats_; }

protected:
ClusterImplBase(const Json::Object& config, Stats::Store& stats,
ClusterImplBase(const Json::Object& config, Runtime::Loader& runtime, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager);

static ConstHostVectorPtr createHealthyHostList(const std::vector<HostPtr>& hosts);
Expand All @@ -180,8 +181,10 @@ class ClusterImplBase : public Cluster,

private:
struct ResourceManagers {
ResourceManagers(const Json::Object& config);
ResourceManagerImplPtr load(const Json::Object& config, const std::string& priority);
ResourceManagers(const Json::Object& config, Runtime::Loader& runtime,
const std::string& cluster_name);
ResourceManagerImplPtr load(const Json::Object& config, Runtime::Loader& runtime,
const std::string& cluster_name, const std::string& priority);

typedef std::array<ResourceManagerImplPtr, NumResourcePriorities> Managers;

Expand All @@ -202,7 +205,7 @@ typedef std::shared_ptr<ClusterImplBase> ClusterImplBasePtr;
*/
class StaticClusterImpl : public ClusterImplBase {
public:
StaticClusterImpl(const Json::Object& config, Stats::Store& stats,
StaticClusterImpl(const Json::Object& config, Runtime::Loader& runtime, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager);

// Upstream::Cluster
Expand Down Expand Up @@ -236,7 +239,7 @@ class BaseDynamicClusterImpl : public ClusterImplBase {
*/
class StrictDnsClusterImpl : public BaseDynamicClusterImpl {
public:
StrictDnsClusterImpl(const Json::Object& config, Stats::Store& stats,
StrictDnsClusterImpl(const Json::Object& config, Runtime::Loader& runtime, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager,
Network::DnsResolver& dns_resolver);
~StrictDnsClusterImpl();
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ add_executable(envoy-test
common/upstream/health_checker_impl_test.cc
common/upstream/load_balancer_impl_test.cc
common/upstream/logical_dns_cluster_test.cc
common/upstream/resource_manager_impl_test.cc
common/upstream/sds_test.cc
common/upstream/upstream_impl_test.cc
example_configs_test.cc
Expand Down
5 changes: 4 additions & 1 deletion test/common/filter/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "test/mocks/buffer/mocks.h"
#include "test/mocks/network/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/mocks/upstream/host.h"
#include "test/mocks/upstream/mocks.h"

Expand Down Expand Up @@ -74,6 +75,7 @@ class TcpProxyTest : public testing::Test {
Network::ReadFilterPtr upstream_read_filter_;
NiceMock<Event::MockTimer>* connect_timer_{};
std::unique_ptr<TcpProxy> filter_;
NiceMock<Runtime::MockLoader> runtime_;
};

TEST_F(TcpProxyTest, UpstreamDisconnect) {
Expand Down Expand Up @@ -161,7 +163,8 @@ TEST_F(TcpProxyTest, UpstreamConnectionLimit) {
// setup sets up expectation for tcpConnForCluster but this test is expected to NOT call that
filter_.reset(new TcpProxy(config_, cluster_manager_));
filter_->initializeReadFilterCallbacks(filter_callbacks_);
cluster_manager_.cluster_.resource_manager_.reset(new Upstream::ResourceManagerImpl(0, 0, 0, 0));
cluster_manager_.cluster_.resource_manager_.reset(
new Upstream::ResourceManagerImpl(runtime_, "fake_key", 0, 0, 0, 0));

Buffer::OwnedImpl buffer("hello");
// The downstream connection closes if the proxy can't make an upstream connection.
Expand Down
8 changes: 6 additions & 2 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "test/mocks/event/mocks.h"
#include "test/mocks/http/mocks.h"
#include "test/mocks/network/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/mocks/upstream/mocks.h"

using testing::_;
Expand Down Expand Up @@ -103,6 +104,7 @@ class Http1ConnPoolImplTest : public testing::Test {
NiceMock<Event::MockDispatcher> dispatcher_;
NiceMock<Upstream::MockCluster> cluster_;
ConnPoolImplForTest conn_pool_;
NiceMock<Runtime::MockLoader> runtime_;
};

/**
Expand Down Expand Up @@ -195,7 +197,8 @@ TEST_F(Http1ConnPoolImplTest, MultipleRequestAndResponse) {
* Test when we overflow max pending requests.
*/
TEST_F(Http1ConnPoolImplTest, MaxPendingRequests) {
cluster_.resource_manager_.reset(new Upstream::ResourceManagerImpl(1, 1, 1024, 1));
cluster_.resource_manager_.reset(
new Upstream::ResourceManagerImpl(runtime_, "fake_key", 1, 1, 1024, 1));

NiceMock<Http::MockStreamDecoder> outer_decoder;
ConnPoolCallbacks callbacks;
Expand Down Expand Up @@ -448,7 +451,8 @@ TEST_F(Http1ConnPoolImplTest, MaxRequestsPerConnection) {
TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) {
InSequence s;

cluster_.resource_manager_.reset(new Upstream::ResourceManagerImpl(2, 1024, 1024, 1));
cluster_.resource_manager_.reset(
new Upstream::ResourceManagerImpl(runtime_, "fake_key", 2, 1024, 1024, 1));
ActiveTestRequest r1(*this, 0, ActiveTestRequest::Type::CreateConnection);
r1.startRequest();

Expand Down
Loading

0 comments on commit f095ab5

Please sign in to comment.