Skip to content

Commit

Permalink
tcp_proxy: add support for weighted clusters (#4430)
Browse files Browse the repository at this point in the history
* tcp_proxy: add support for weighted clusters

Signed-off-by: Venil Noronha <[email protected]>
  • Loading branch information
venilnoronha authored and ggreenway committed Sep 24, 2018
1 parent f1d31da commit 10625c5
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 55 deletions.
59 changes: 46 additions & 13 deletions api/envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,34 @@ message TcpProxy {
// <config_network_filters_tcp_proxy_stats>`.
string stat_prefix = 1 [(validate.rules).string.min_bytes = 1];

// The upstream cluster to connect to.
//
// .. note::
//
// Once full filter chain matching is implemented in listeners, this field will become the only
// way to configure the target cluster. All other matching will be done via :ref:`filter chain
// matching rules <envoy_api_msg_listener.FilterChainMatch>`. For very simple configurations,
// this field can still be used to select the cluster when no other matching rules are required.
// Otherwise, a :ref:`deprecated_v1
// <envoy_api_field_config.filter.network.tcp_proxy.v2.TcpProxy.deprecated_v1>` configuration is
// required to use more complex routing in the interim.
//
string cluster = 2;
oneof cluster_specifier {
option (validate.required) = true;

// The upstream cluster to connect to.
//
// .. note::
//
// Once full filter chain matching is implemented in listeners, this field will become the only
// way to configure the target cluster. All other matching will be done via :ref:`filter chain
// matching rules <envoy_api_msg_listener.FilterChainMatch>`. For very simple configurations,
// this field can still be used to select the cluster when no other matching rules are
// required. Otherwise, a :ref:`deprecated_v1
// <envoy_api_field_config.filter.network.tcp_proxy.v2.TcpProxy.deprecated_v1>` configuration
// is required to use more complex routing in the interim.
//
string cluster = 2;

// Multiple upstream clusters can be specified for a given route. The
// request is routed to one of the upstream clusters based on weights
// assigned to each cluster.
//
// .. note::
//
// This field is ignored if the :ref:`deprecated_v1
// <envoy_api_field_config.filter.network.tcp_proxy.v2.TcpProxy.deprecated_v1>`
// configuration is set.
WeightedCluster weighted_clusters = 10;
}

// Optional endpoint metadata match criteria. Only endpoints in the upstream
// cluster with metadata matching that set in metadata_match will be
Expand Down Expand Up @@ -131,4 +146,22 @@ message TcpProxy {
// The maximum number of unsuccessful connection attempts that will be made before
// giving up. If the parameter is not specified, 1 connection attempt will be made.
google.protobuf.UInt32Value max_connect_attempts = 7 [(validate.rules).uint32.gte = 1];

// Allows for specification of multiple upstream clusters along with weights
// that indicate the percentage of traffic to be forwarded to each cluster.
// The router selects an upstream cluster based on these weights.
message WeightedCluster {
message ClusterWeight {
// Name of the upstream cluster.
string name = 1 [(validate.rules).string.min_bytes = 1];

// When a request matches the route, the choice of an upstream cluster is
// determined by its weight. The sum of weights across all entries in the
// clusters array determines the total weight.
uint32 weight = 2 [(validate.rules).uint32.gte = 1];
}

// Specifies one or more upstream clusters associated with the route.
repeated ClusterWeight clusters = 1 [(validate.rules).repeated .min_items = 1];
}
}
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Version history
* rbac network filter: a :ref:`role-based access control network filter <config_network_filters_rbac>` has been added.
* rest-api: added ability to set the :ref:`request timeout <envoy_api_field_core.ApiConfigSource.request_timeout>` for REST API requests.
* router: added ability to set request/response headers at the :ref:`envoy_api_msg_route.Route` level.
* tcp_proxy: added support for :ref:`weighted clusters <envoy_api_field_config.filter.network.tcp_proxy.v2.TcpProxy.weighted_clusters>`.
* tracing: added support for configuration of :ref:`tracing sampling
<envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.tracing>`.
* thrift_proxy: introduced thrift routing, moved configuration to correct location
Expand Down
49 changes: 47 additions & 2 deletions source/common/common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ class StringUtil {
};

/**
* Utilities for finding primes
* Utilities for finding primes.
*/
class Primes {
public:
Expand All @@ -411,13 +411,58 @@ class RegexUtil {
* Constructs a std::regex, converting any std::regex_error exception into an EnvoyException.
* @param regex std::string containing the regular expression to parse.
* @param flags std::regex::flag_type containing parser flags. Defaults to std::regex::optimize.
* @return std::regex constructed from regex and flags
* @return std::regex constructed from regex and flags.
* @throw EnvoyException if the regex string is invalid.
*/
static std::regex parseRegex(const std::string& regex,
std::regex::flag_type flags = std::regex::optimize);
};

/**
* Utilities for working with weighted clusters.
*/
class WeightedClusterUtil {
public:
/*
* Returns a WeightedClusterEntry from the given weighted clusters based on
* the total cluster weight and a random value.
* @param weighted_clusters a vector of WeightedClusterEntry instances.
* @param total_cluster_weight the total weight of all clusters.
* @param random_value the random value.
* @param ignore_overflow whether to ignore cluster weight overflows.
* @return a WeightedClusterEntry.
*/
template <typename WeightedClusterEntry>
static const WeightedClusterEntry&
pickCluster(const std::vector<WeightedClusterEntry>& weighted_clusters,
const uint64_t total_cluster_weight, const uint64_t random_value,
const bool ignore_overflow) {
uint64_t selected_value = random_value % total_cluster_weight;
uint64_t begin = 0;
uint64_t end = 0;

// Find the right cluster to route to based on the interval in which
// the selected value falls. The intervals are determined as
// [0, cluster1_weight), [cluster1_weight, cluster1_weight+cluster2_weight),..
for (const WeightedClusterEntry& cluster : weighted_clusters) {
end = begin + cluster->clusterWeight();
if (!ignore_overflow) {
// end > total_cluster_weight: This case can only occur with Runtimes,
// when the user specifies invalid weights such that
// sum(weights) > total_cluster_weight.
ASSERT(end <= total_cluster_weight);
}

if (selected_value >= begin && selected_value < end) {
return cluster;
}
begin = end;
}

NOT_REACHED_GCOVR_EXCL_LINE;
}
};

/**
* Maintains sets of numeric intervals. As new intervals are added, existing ones in the
* set are combined so that no overlapping intervals remain in the representation.
Expand Down
20 changes: 2 additions & 18 deletions source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,24 +539,8 @@ RouteConstSharedPtr RouteEntryImplBase::clusterEntry(const Http::HeaderMap& head
}
}

uint64_t selected_value = random_value % total_cluster_weight_;
uint64_t begin = 0UL;
uint64_t end = 0UL;

// Find the right cluster to route to based on the interval in which
// the selected value falls. The intervals are determined as
// [0, cluster1_weight), [cluster1_weight, cluster1_weight+cluster2_weight),..
for (const WeightedClusterEntrySharedPtr& cluster : weighted_clusters_) {
end = begin + cluster->clusterWeight();
if (((selected_value >= begin) && (selected_value < end)) || (end >= total_cluster_weight_)) {
// end > total_cluster_weight_: This case can only occur with Runtimes, when the user
// specifies invalid weights such that sum(weights) > total_cluster_weight_. In this case,
// terminate the search and just return the cluster whose weight caused the overflow.
return cluster;
}
begin = end;
}
NOT_REACHED_GCOVR_EXCL_LINE;
return WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value,
true);
}

void RouteEntryImplBase::validateClusters(Upstream::ClusterManager& cm) const {
Expand Down
33 changes: 31 additions & 2 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/common/assert.h"
#include "common/common/empty_string.h"
#include "common/common/fmt.h"
#include "common/common/utility.h"
#include "common/config/well_known_names.h"
#include "common/router/metadatamatchcriteria_impl.h"

Expand All @@ -37,6 +38,11 @@ Config::Route::Route(
}
}

Config::WeightedClusterEntry::WeightedClusterEntry(
const envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster::ClusterWeight&
config)
: cluster_name_(config.name()), cluster_weight_(config.weight()) {}

Config::SharedConfig::SharedConfig(
const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& config,
Server::Configuration::FactoryContext& context)
Expand All @@ -52,7 +58,8 @@ Config::Config(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& co
Server::Configuration::FactoryContext& context)
: max_connect_attempts_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_connect_attempts, 1)),
upstream_drain_manager_slot_(context.threadLocal().allocateSlot()),
shared_config_(std::make_shared<SharedConfig>(config, context)) {
shared_config_(std::make_shared<SharedConfig>(config, context)),
random_generator_(context.random()) {

upstream_drain_manager_slot_->set([](Event::Dispatcher&) {
return ThreadLocal::ThreadLocalObjectSharedPtr(new UpstreamDrainManager());
Expand All @@ -71,6 +78,19 @@ Config::Config(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& co
routes_.emplace_back(default_route);
}

// Weighted clusters will be enabled only if both the default cluster and
// deprecated v1 routes are absent.
if (routes_.empty() && config.has_weighted_clusters()) {
total_cluster_weight_ = 0;
for (const envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster::
ClusterWeight& cluster_desc : config.weighted_clusters().clusters()) {
std::unique_ptr<WeightedClusterEntry> cluster_entry(
std::make_unique<WeightedClusterEntry>(cluster_desc));
weighted_clusters_.emplace_back(std::move(cluster_entry));
total_cluster_weight_ += weighted_clusters_.back()->clusterWeight();
}
}

if (config.has_metadata_match()) {
const auto& filter_metadata = config.metadata_match().filter_metadata();

Expand All @@ -87,7 +107,7 @@ Config::Config(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& co
}
}

const std::string& Config::getRouteFromEntries(Network::Connection& connection) {
const std::string& Config::getRegularRouteFromEntries(Network::Connection& connection) {
for (const Config::Route& route : routes_) {
if (!route.source_port_ranges_.empty() &&
!Network::Utility::portInRangeList(*connection.remoteAddress(),
Expand Down Expand Up @@ -118,6 +138,15 @@ const std::string& Config::getRouteFromEntries(Network::Connection& connection)
return EMPTY_STRING;
}

const std::string& Config::getRouteFromEntries(Network::Connection& connection) {
if (weighted_clusters_.empty()) {
return getRegularRouteFromEntries(connection);
}
return WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_,
random_generator_.random(), false)
->clusterName();
}

UpstreamDrainManager& Config::drainManager() {
return upstream_drain_manager_slot_->getTyped<UpstreamDrainManager>();
}
Expand Down
19 changes: 19 additions & 0 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "envoy/event/timer.h"
#include "envoy/network/connection.h"
#include "envoy/network/filter.h"
#include "envoy/runtime/runtime.h"
#include "envoy/server/filter_config.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"
Expand Down Expand Up @@ -100,6 +101,7 @@ class Config {
* If no route applies, returns the empty string.
*/
const std::string& getRouteFromEntries(Network::Connection& connection);
const std::string& getRegularRouteFromEntries(Network::Connection& connection);

const TcpProxyStats& stats() { return shared_config_->stats(); }
const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() { return access_logs_; }
Expand All @@ -125,12 +127,29 @@ class Config {
std::string cluster_name_;
};

class WeightedClusterEntry {
public:
WeightedClusterEntry(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy::
WeightedCluster::ClusterWeight& config);

const std::string& clusterName() const { return cluster_name_; }
uint64_t clusterWeight() const { return cluster_weight_; }

private:
const std::string cluster_name_;
const uint64_t cluster_weight_;
};
typedef std::unique_ptr<WeightedClusterEntry> WeightedClusterEntrySharedPtr;

std::vector<Route> routes_;
std::vector<WeightedClusterEntrySharedPtr> weighted_clusters_;
uint64_t total_cluster_weight_;
std::vector<AccessLog::InstanceSharedPtr> access_logs_;
const uint32_t max_connect_attempts_;
ThreadLocal::SlotPtr upstream_drain_manager_slot_;
SharedConfigSharedPtr shared_config_;
std::unique_ptr<const Router::MetadataMatchCriteria> cluster_metadata_match_criteria_;
Runtime::RandomGenerator& random_generator_;
};

typedef std::shared_ptr<Config> ConfigSharedPtr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,8 @@ RouteConstSharedPtr RouteEntryImplBase::clusterEntry(uint64_t random_value) cons
if (weighted_clusters_.empty()) {
return shared_from_this();
}

uint64_t selected_value = random_value % total_cluster_weight_;
uint64_t begin = 0UL;
uint64_t end = 0UL;

// Find the right cluster to route to based on the interval in which
// the selected value falls. The intervals are determined as
// [0, cluster1_weight), [cluster1_weight, cluster1_weight+cluster2_weight),..
for (const WeightedClusterEntrySharedPtr& cluster : weighted_clusters_) {
end = begin + cluster->clusterWeight();
ASSERT(end <= total_cluster_weight_);

if (selected_value >= begin && selected_value < end) {
return cluster;
}

begin = end;
}

NOT_REACHED_GCOVR_EXCL_LINE;
return WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value,
false);
}

bool RouteEntryImplBase::headersMatch(const Http::HeaderMap& headers) const {
Expand Down
27 changes: 27 additions & 0 deletions test/common/common/utility_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,33 @@ TEST(RegexUtil, parseRegex) {
}
}

class WeightedClusterEntry {
public:
WeightedClusterEntry(const std::string name, const uint64_t weight)
: name_(name), weight_(weight) {}

const std::string& clusterName() const { return name_; }
uint64_t clusterWeight() const { return weight_; }

private:
const std::string name_;
const uint64_t weight_;
};
typedef std::shared_ptr<WeightedClusterEntry> WeightedClusterEntrySharedPtr;

TEST(WeightedClusterUtil, pickCluster) {
std::vector<WeightedClusterEntrySharedPtr> clusters;

std::unique_ptr<WeightedClusterEntry> cluster1(new WeightedClusterEntry("cluster1", 10));
clusters.emplace_back(std::move(cluster1));

std::unique_ptr<WeightedClusterEntry> cluster2(new WeightedClusterEntry("cluster2", 90));
clusters.emplace_back(std::move(cluster2));

EXPECT_EQ("cluster1", WeightedClusterUtil::pickCluster(clusters, 100, 5, false)->clusterName());
EXPECT_EQ("cluster2", WeightedClusterUtil::pickCluster(clusters, 80, 79, true)->clusterName());
}

static std::string intervalSetIntToString(const IntervalSetImpl<int>& interval_set) {
std::string out;
const char* prefix = "";
Expand Down
14 changes: 14 additions & 0 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,20 @@ class TcpProxyTest : public testing::Test {
Network::Address::InstanceConstSharedPtr upstream_remote_address_;
};

TEST_F(TcpProxyTest, DefaultRoutes) {
envoy::config::filter::network::tcp_proxy::v2::TcpProxy config = defaultConfig();

envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster::ClusterWeight*
ignored_cluster = config.mutable_weighted_clusters()->mutable_clusters()->Add();
ignored_cluster->set_name("ignored_cluster");
ignored_cluster->set_weight(10);

configure(config);

NiceMock<Network::MockConnection> connection;
EXPECT_EQ(std::string("fake_cluster"), config_->getRouteFromEntries(connection));
}

// Tests that half-closes are proxied and don't themselves cause any connection to be closed.
TEST_F(TcpProxyTest, HalfCloseProxy) {
setup(1);
Expand Down

0 comments on commit 10625c5

Please sign in to comment.