Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcp_proxy: add support for weighted clusters #4430

Merged
merged 8 commits into from
Sep 24, 2018
53 changes: 40 additions & 13 deletions api/envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,28 @@ message TcpProxy {
// <config_network_filters_tcp_proxy_stats>`.
string stat_prefix = 1 [(validate.rules).string.min_bytes = 1];

// The upstream cluster to connect to.
//
// .. note::
//
// Once full filter chain matching is implemented in listeners, this field will become the only
// way to configure the target cluster. All other matching will be done via :ref:`filter chain
// matching rules <envoy_api_msg_listener.FilterChainMatch>`. For very simple configurations,
// this field can still be used to select the cluster when no other matching rules are required.
// Otherwise, a :ref:`deprecated_v1
// <envoy_api_field_config.filter.network.tcp_proxy.v2.TcpProxy.deprecated_v1>` configuration is
// required to use more complex routing in the interim.
//
string cluster = 2;
oneof cluster_specifier {
option (validate.required) = true;

// The upstream cluster to connect to.
//
// .. note::
//
// Once full filter chain matching is implemented in listeners, this field will become the only
// way to configure the target cluster. All other matching will be done via :ref:`filter chain
// matching rules <envoy_api_msg_listener.FilterChainMatch>`. For very simple configurations,
// this field can still be used to select the cluster when no other matching rules are
// required. Otherwise, a :ref:`deprecated_v1
// <envoy_api_field_config.filter.network.tcp_proxy.v2.TcpProxy.deprecated_v1>` configuration
// is required to use more complex routing in the interim.
//
string cluster = 2;

// Multiple upstream clusters can be specified for a given route. The
// request is routed to one of the upstream clusters based on weights
// assigned to each cluster.
venilnoronha marked this conversation as resolved.
Show resolved Hide resolved
WeightedCluster weighted_clusters = 10;
}

// Optional endpoint metadata match criteria. Only endpoints in the upstream
// cluster with metadata matching that set in metadata_match will be
Expand Down Expand Up @@ -131,4 +140,22 @@ message TcpProxy {
// The maximum number of unsuccessful connection attempts that will be made before
// giving up. If the parameter is not specified, 1 connection attempt will be made.
google.protobuf.UInt32Value max_connect_attempts = 7 [(validate.rules).uint32.gte = 1];

// Allows for specification of multiple upstream clusters along with weights
// that indicate the percentage of traffic to be forwarded to each cluster.
// The router selects an upstream cluster based on these weights.
message WeightedCluster {
message ClusterWeight {
// Name of the upstream cluster.
string name = 1 [(validate.rules).string.min_bytes = 1];

// When a request matches the route, the choice of an upstream cluster is
// determined by its weight. The sum of weights across all entries in the
// clusters array determines the total weight.
uint32 weight = 2 [(validate.rules).uint32.gte = 1];
venilnoronha marked this conversation as resolved.
Show resolved Hide resolved
}

// Specifies one or more upstream clusters associated with the route.
repeated ClusterWeight clusters = 1 [(validate.rules).repeated .min_items = 1];
}
}
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Version history
* rbac network filter: a :ref:`role-based access control network filter <config_network_filters_rbac>` has been added.
* rest-api: added ability to set the :ref:`request timeout <envoy_api_field_core.ApiConfigSource.request_timeout>` for REST API requests.
* router: added ability to set request/response headers at the :ref:`envoy_api_msg_route.Route` level.
* tcp_proxy: added support for :ref:`weighted clusters <envoy_api_field_config.filter.network.tcp_proxy.v2.TcpProxy.weighted_clusters>`.
* tracing: added support for configuration of :ref:`tracing sampling
<envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.tracing>`.
* thrift_proxy: introduced thrift routing, moved configuration to correct location
Expand Down
50 changes: 48 additions & 2 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ Config::Route::Route(
}
}

Config::WeightedClusterEntry::WeightedClusterEntry(
const envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster::ClusterWeight&
config)
: cluster_name_(config.name()), cluster_weight_(config.weight()) {}

Config::SharedConfig::SharedConfig(
const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& config,
Server::Configuration::FactoryContext& context)
Expand All @@ -52,7 +57,8 @@ Config::Config(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& co
Server::Configuration::FactoryContext& context)
: max_connect_attempts_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_connect_attempts, 1)),
upstream_drain_manager_slot_(context.threadLocal().allocateSlot()),
shared_config_(std::make_shared<SharedConfig>(config, context)) {
shared_config_(std::make_shared<SharedConfig>(config, context)),
random_generator_(context.random()) {

upstream_drain_manager_slot_->set([](Event::Dispatcher&) {
return ThreadLocal::ThreadLocalObjectSharedPtr(new UpstreamDrainManager());
Expand All @@ -71,6 +77,17 @@ Config::Config(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& co
routes_.emplace_back(default_route);
}

// Weighted clusters will be enabled only if both the default cluster and
// deprecated v1 routes are absent.
venilnoronha marked this conversation as resolved.
Show resolved Hide resolved
if (routes_.empty() && config.has_weighted_clusters()) {
total_cluster_weight_ = 0UL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0; (UL suffix isn't needed).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 99352da.

for (const envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster::
ClusterWeight& cluster_desc : config.weighted_clusters().clusters()) {
weighted_clusters_.emplace_back(WeightedClusterEntry(cluster_desc));
total_cluster_weight_ += weighted_clusters_.back().cluster_weight_;
}
}

if (config.has_metadata_match()) {
const auto& filter_metadata = config.metadata_match().filter_metadata();

Expand All @@ -87,7 +104,7 @@ Config::Config(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& co
}
}

const std::string& Config::getRouteFromEntries(Network::Connection& connection) {
const std::string& Config::getRegularRouteFromEntries(Network::Connection& connection) {
for (const Config::Route& route : routes_) {
if (!route.source_port_ranges_.empty() &&
!Network::Utility::portInRangeList(*connection.remoteAddress(),
Expand Down Expand Up @@ -118,6 +135,35 @@ const std::string& Config::getRouteFromEntries(Network::Connection& connection)
return EMPTY_STRING;
}

const std::string& Config::getWeightedClusterRoute(const uint64_t random_value) {
uint64_t selected_value = random_value % total_cluster_weight_;
uint64_t begin = 0UL;
uint64_t end = 0UL;

// Find the right cluster to route to based on the interval in which
// the selected value falls. The intervals are determined as
// [0, cluster1_weight), [cluster1_weight, cluster1_weight+cluster2_weight),..
for (const WeightedClusterEntry& cluster : weighted_clusters_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is duplicate of the same thing in thrift_proxy. Is there any way to share the code for this algorithm? Maybe a templatized function just for the algorithm, so you don't need to make the entries be a common type?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And that is a dupe of code in hcm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 9dce4f5.

end = begin + cluster.cluster_weight_;
ASSERT(end <= total_cluster_weight_);

if (selected_value >= begin && selected_value < end) {
return cluster.cluster_name_;
}
begin = end;
}

NOT_REACHED_GCOVR_EXCL_LINE;
}

const std::string& Config::getRouteFromEntries(Network::Connection& connection) {
if (!weighted_clusters_.empty()) {
return getWeightedClusterRoute(random_generator_.random());
} else {
return getRegularRouteFromEntries(connection);
}
}

UpstreamDrainManager& Config::drainManager() {
return upstream_drain_manager_slot_->getTyped<UpstreamDrainManager>();
}
Expand Down
14 changes: 14 additions & 0 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "envoy/event/timer.h"
#include "envoy/network/connection.h"
#include "envoy/network/filter.h"
#include "envoy/runtime/runtime.h"
#include "envoy/server/filter_config.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"
Expand Down Expand Up @@ -100,6 +101,8 @@ class Config {
* If no route applies, returns the empty string.
*/
const std::string& getRouteFromEntries(Network::Connection& connection);
const std::string& getRegularRouteFromEntries(Network::Connection& connection);
const std::string& getWeightedClusterRoute(const uint64_t random_value);

const TcpProxyStats& stats() { return shared_config_->stats(); }
const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() { return access_logs_; }
Expand All @@ -125,12 +128,23 @@ class Config {
std::string cluster_name_;
};

struct WeightedClusterEntry {
WeightedClusterEntry(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy::
WeightedCluster::ClusterWeight& config);

const std::string cluster_name_;
const uint64_t cluster_weight_;
};

std::vector<Route> routes_;
std::vector<WeightedClusterEntry> weighted_clusters_;
uint64_t total_cluster_weight_;
std::vector<AccessLog::InstanceSharedPtr> access_logs_;
const uint32_t max_connect_attempts_;
ThreadLocal::SlotPtr upstream_drain_manager_slot_;
SharedConfigSharedPtr shared_config_;
std::unique_ptr<const Router::MetadataMatchCriteria> cluster_metadata_match_criteria_;
Runtime::RandomGenerator& random_generator_;
};

typedef std::shared_ptr<Config> ConfigSharedPtr;
Expand Down
33 changes: 33 additions & 0 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,39 @@ class TcpProxyTest : public testing::Test {
Network::Address::InstanceConstSharedPtr upstream_remote_address_;
};

TEST_F(TcpProxyTest, WeightedClusters) {
envoy::config::filter::network::tcp_proxy::v2::TcpProxy config;

envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster::ClusterWeight*
cluster1 = config.mutable_weighted_clusters()->mutable_clusters()->Add();
cluster1->set_name("cluster1");
cluster1->set_weight(10);

envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster::ClusterWeight*
cluster2 = config.mutable_weighted_clusters()->mutable_clusters()->Add();
cluster2->set_name("cluster2");
cluster2->set_weight(90);

configure(config);

EXPECT_EQ(std::string("cluster1"), config_->getWeightedClusterRoute(5));
EXPECT_EQ(std::string("cluster2"), config_->getWeightedClusterRoute(25));
}

TEST_F(TcpProxyTest, DefaultRoutes) {
envoy::config::filter::network::tcp_proxy::v2::TcpProxy config = defaultConfig();

envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster::ClusterWeight*
ignored_cluster = config.mutable_weighted_clusters()->mutable_clusters()->Add();
ignored_cluster->set_name("ignored_cluster");
ignored_cluster->set_weight(10);

configure(config);

NiceMock<Network::MockConnection> connection;
EXPECT_EQ(std::string("fake_cluster"), config_->getRouteFromEntries(connection));
}

// Tests that half-closes are proxied and don't themselves cause any connection to be closed.
TEST_F(TcpProxyTest, HalfCloseProxy) {
setup(1);
Expand Down