diff --git a/api/envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.proto b/api/envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.proto index d09952daf31f..c0cced1f1a58 100644 --- a/api/envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.proto +++ b/api/envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.proto @@ -21,19 +21,34 @@ message TcpProxy { // `. string stat_prefix = 1 [(validate.rules).string.min_bytes = 1]; - // The upstream cluster to connect to. - // - // .. note:: - // - // Once full filter chain matching is implemented in listeners, this field will become the only - // way to configure the target cluster. All other matching will be done via :ref:`filter chain - // matching rules `. For very simple configurations, - // this field can still be used to select the cluster when no other matching rules are required. - // Otherwise, a :ref:`deprecated_v1 - // ` configuration is - // required to use more complex routing in the interim. - // - string cluster = 2; + oneof cluster_specifier { + option (validate.required) = true; + + // The upstream cluster to connect to. + // + // .. note:: + // + // Once full filter chain matching is implemented in listeners, this field will become the only + // way to configure the target cluster. All other matching will be done via :ref:`filter chain + // matching rules `. For very simple configurations, + // this field can still be used to select the cluster when no other matching rules are + // required. Otherwise, a :ref:`deprecated_v1 + // ` configuration + // is required to use more complex routing in the interim. + // + string cluster = 2; + + // Multiple upstream clusters can be specified for a given route. The + // request is routed to one of the upstream clusters based on weights + // assigned to each cluster. + // + // .. note:: + // + // This field is ignored if the :ref:`deprecated_v1 + // ` + // configuration is set. + WeightedCluster weighted_clusters = 10; + } // Optional endpoint metadata match criteria. Only endpoints in the upstream // cluster with metadata matching that set in metadata_match will be @@ -131,4 +146,22 @@ message TcpProxy { // The maximum number of unsuccessful connection attempts that will be made before // giving up. If the parameter is not specified, 1 connection attempt will be made. google.protobuf.UInt32Value max_connect_attempts = 7 [(validate.rules).uint32.gte = 1]; + + // Allows for specification of multiple upstream clusters along with weights + // that indicate the percentage of traffic to be forwarded to each cluster. + // The router selects an upstream cluster based on these weights. + message WeightedCluster { + message ClusterWeight { + // Name of the upstream cluster. + string name = 1 [(validate.rules).string.min_bytes = 1]; + + // When a request matches the route, the choice of an upstream cluster is + // determined by its weight. The sum of weights across all entries in the + // clusters array determines the total weight. + uint32 weight = 2 [(validate.rules).uint32.gte = 1]; + } + + // Specifies one or more upstream clusters associated with the route. + repeated ClusterWeight clusters = 1 [(validate.rules).repeated .min_items = 1]; + } } diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index df98978994f8..d22bfc6369df 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -71,6 +71,7 @@ Version history * rbac network filter: a :ref:`role-based access control network filter ` has been added. * rest-api: added ability to set the :ref:`request timeout ` for REST API requests. * router: added ability to set request/response headers at the :ref:`envoy_api_msg_route.Route` level. +* tcp_proxy: added support for :ref:`weighted clusters `. * tracing: added support for configuration of :ref:`tracing sampling `. * thrift_proxy: introduced thrift routing, moved configuration to correct location diff --git a/source/common/common/utility.h b/source/common/common/utility.h index c9d6101702f4..f8cd45af9227 100644 --- a/source/common/common/utility.h +++ b/source/common/common/utility.h @@ -387,7 +387,7 @@ class StringUtil { }; /** - * Utilities for finding primes + * Utilities for finding primes. */ class Primes { public: @@ -411,13 +411,58 @@ class RegexUtil { * Constructs a std::regex, converting any std::regex_error exception into an EnvoyException. * @param regex std::string containing the regular expression to parse. * @param flags std::regex::flag_type containing parser flags. Defaults to std::regex::optimize. - * @return std::regex constructed from regex and flags + * @return std::regex constructed from regex and flags. * @throw EnvoyException if the regex string is invalid. */ static std::regex parseRegex(const std::string& regex, std::regex::flag_type flags = std::regex::optimize); }; +/** + * Utilities for working with weighted clusters. + */ +class WeightedClusterUtil { +public: + /* + * Returns a WeightedClusterEntry from the given weighted clusters based on + * the total cluster weight and a random value. + * @param weighted_clusters a vector of WeightedClusterEntry instances. + * @param total_cluster_weight the total weight of all clusters. + * @param random_value the random value. + * @param ignore_overflow whether to ignore cluster weight overflows. + * @return a WeightedClusterEntry. + */ + template + static const WeightedClusterEntry& + pickCluster(const std::vector& weighted_clusters, + const uint64_t total_cluster_weight, const uint64_t random_value, + const bool ignore_overflow) { + uint64_t selected_value = random_value % total_cluster_weight; + uint64_t begin = 0; + uint64_t end = 0; + + // Find the right cluster to route to based on the interval in which + // the selected value falls. The intervals are determined as + // [0, cluster1_weight), [cluster1_weight, cluster1_weight+cluster2_weight),.. + for (const WeightedClusterEntry& cluster : weighted_clusters) { + end = begin + cluster->clusterWeight(); + if (!ignore_overflow) { + // end > total_cluster_weight: This case can only occur with Runtimes, + // when the user specifies invalid weights such that + // sum(weights) > total_cluster_weight. + ASSERT(end <= total_cluster_weight); + } + + if (selected_value >= begin && selected_value < end) { + return cluster; + } + begin = end; + } + + NOT_REACHED_GCOVR_EXCL_LINE; + } +}; + /** * Maintains sets of numeric intervals. As new intervals are added, existing ones in the * set are combined so that no overlapping intervals remain in the representation. diff --git a/source/common/router/config_impl.cc b/source/common/router/config_impl.cc index a48bf4e3baef..c977aba850b5 100644 --- a/source/common/router/config_impl.cc +++ b/source/common/router/config_impl.cc @@ -552,24 +552,8 @@ RouteConstSharedPtr RouteEntryImplBase::clusterEntry(const Http::HeaderMap& head } } - uint64_t selected_value = random_value % total_cluster_weight_; - uint64_t begin = 0UL; - uint64_t end = 0UL; - - // Find the right cluster to route to based on the interval in which - // the selected value falls. The intervals are determined as - // [0, cluster1_weight), [cluster1_weight, cluster1_weight+cluster2_weight),.. - for (const WeightedClusterEntrySharedPtr& cluster : weighted_clusters_) { - end = begin + cluster->clusterWeight(); - if (((selected_value >= begin) && (selected_value < end)) || (end >= total_cluster_weight_)) { - // end > total_cluster_weight_: This case can only occur with Runtimes, when the user - // specifies invalid weights such that sum(weights) > total_cluster_weight_. In this case, - // terminate the search and just return the cluster whose weight caused the overflow. - return cluster; - } - begin = end; - } - NOT_REACHED_GCOVR_EXCL_LINE; + return WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value, + true); } void RouteEntryImplBase::validateClusters(Upstream::ClusterManager& cm) const { diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index e4517f158111..2d459f03d801 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -15,6 +15,7 @@ #include "common/common/assert.h" #include "common/common/empty_string.h" #include "common/common/fmt.h" +#include "common/common/utility.h" #include "common/config/well_known_names.h" #include "common/router/metadatamatchcriteria_impl.h" @@ -37,6 +38,11 @@ Config::Route::Route( } } +Config::WeightedClusterEntry::WeightedClusterEntry( + const envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster::ClusterWeight& + config) + : cluster_name_(config.name()), cluster_weight_(config.weight()) {} + Config::SharedConfig::SharedConfig( const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& config, Server::Configuration::FactoryContext& context) @@ -52,7 +58,8 @@ Config::Config(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& co Server::Configuration::FactoryContext& context) : max_connect_attempts_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_connect_attempts, 1)), upstream_drain_manager_slot_(context.threadLocal().allocateSlot()), - shared_config_(std::make_shared(config, context)) { + shared_config_(std::make_shared(config, context)), + random_generator_(context.random()) { upstream_drain_manager_slot_->set([](Event::Dispatcher&) { return ThreadLocal::ThreadLocalObjectSharedPtr(new UpstreamDrainManager()); @@ -71,6 +78,19 @@ Config::Config(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& co routes_.emplace_back(default_route); } + // Weighted clusters will be enabled only if both the default cluster and + // deprecated v1 routes are absent. + if (routes_.empty() && config.has_weighted_clusters()) { + total_cluster_weight_ = 0; + for (const envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster:: + ClusterWeight& cluster_desc : config.weighted_clusters().clusters()) { + std::unique_ptr cluster_entry( + std::make_unique(cluster_desc)); + weighted_clusters_.emplace_back(std::move(cluster_entry)); + total_cluster_weight_ += weighted_clusters_.back()->clusterWeight(); + } + } + if (config.has_metadata_match()) { const auto& filter_metadata = config.metadata_match().filter_metadata(); @@ -87,7 +107,7 @@ Config::Config(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy& co } } -const std::string& Config::getRouteFromEntries(Network::Connection& connection) { +const std::string& Config::getRegularRouteFromEntries(Network::Connection& connection) { for (const Config::Route& route : routes_) { if (!route.source_port_ranges_.empty() && !Network::Utility::portInRangeList(*connection.remoteAddress(), @@ -118,6 +138,15 @@ const std::string& Config::getRouteFromEntries(Network::Connection& connection) return EMPTY_STRING; } +const std::string& Config::getRouteFromEntries(Network::Connection& connection) { + if (weighted_clusters_.empty()) { + return getRegularRouteFromEntries(connection); + } + return WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, + random_generator_.random(), false) + ->clusterName(); +} + UpstreamDrainManager& Config::drainManager() { return upstream_drain_manager_slot_->getTyped(); } diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 3fa588f3702d..dfd8b0bef8af 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -11,6 +11,7 @@ #include "envoy/event/timer.h" #include "envoy/network/connection.h" #include "envoy/network/filter.h" +#include "envoy/runtime/runtime.h" #include "envoy/server/filter_config.h" #include "envoy/stats/scope.h" #include "envoy/stats/stats_macros.h" @@ -100,6 +101,7 @@ class Config { * If no route applies, returns the empty string. */ const std::string& getRouteFromEntries(Network::Connection& connection); + const std::string& getRegularRouteFromEntries(Network::Connection& connection); const TcpProxyStats& stats() { return shared_config_->stats(); } const std::vector& accessLogs() { return access_logs_; } @@ -125,12 +127,29 @@ class Config { std::string cluster_name_; }; + class WeightedClusterEntry { + public: + WeightedClusterEntry(const envoy::config::filter::network::tcp_proxy::v2::TcpProxy:: + WeightedCluster::ClusterWeight& config); + + const std::string& clusterName() const { return cluster_name_; } + uint64_t clusterWeight() const { return cluster_weight_; } + + private: + const std::string cluster_name_; + const uint64_t cluster_weight_; + }; + typedef std::unique_ptr WeightedClusterEntrySharedPtr; + std::vector routes_; + std::vector weighted_clusters_; + uint64_t total_cluster_weight_; std::vector access_logs_; const uint32_t max_connect_attempts_; ThreadLocal::SlotPtr upstream_drain_manager_slot_; SharedConfigSharedPtr shared_config_; std::unique_ptr cluster_metadata_match_criteria_; + Runtime::RandomGenerator& random_generator_; }; typedef std::shared_ptr ConfigSharedPtr; diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 1171b758b647..bdf87ee755d0 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -42,26 +42,8 @@ RouteConstSharedPtr RouteEntryImplBase::clusterEntry(uint64_t random_value) cons if (weighted_clusters_.empty()) { return shared_from_this(); } - - uint64_t selected_value = random_value % total_cluster_weight_; - uint64_t begin = 0UL; - uint64_t end = 0UL; - - // Find the right cluster to route to based on the interval in which - // the selected value falls. The intervals are determined as - // [0, cluster1_weight), [cluster1_weight, cluster1_weight+cluster2_weight),.. - for (const WeightedClusterEntrySharedPtr& cluster : weighted_clusters_) { - end = begin + cluster->clusterWeight(); - ASSERT(end <= total_cluster_weight_); - - if (selected_value >= begin && selected_value < end) { - return cluster; - } - - begin = end; - } - - NOT_REACHED_GCOVR_EXCL_LINE; + return WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value, + false); } bool RouteEntryImplBase::headersMatch(const Http::HeaderMap& headers) const { diff --git a/test/common/common/utility_test.cc b/test/common/common/utility_test.cc index 386a3d3503a5..4081345c58ca 100644 --- a/test/common/common/utility_test.cc +++ b/test/common/common/utility_test.cc @@ -470,6 +470,33 @@ TEST(RegexUtil, parseRegex) { } } +class WeightedClusterEntry { +public: + WeightedClusterEntry(const std::string name, const uint64_t weight) + : name_(name), weight_(weight) {} + + const std::string& clusterName() const { return name_; } + uint64_t clusterWeight() const { return weight_; } + +private: + const std::string name_; + const uint64_t weight_; +}; +typedef std::shared_ptr WeightedClusterEntrySharedPtr; + +TEST(WeightedClusterUtil, pickCluster) { + std::vector clusters; + + std::unique_ptr cluster1(new WeightedClusterEntry("cluster1", 10)); + clusters.emplace_back(std::move(cluster1)); + + std::unique_ptr cluster2(new WeightedClusterEntry("cluster2", 90)); + clusters.emplace_back(std::move(cluster2)); + + EXPECT_EQ("cluster1", WeightedClusterUtil::pickCluster(clusters, 100, 5, false)->clusterName()); + EXPECT_EQ("cluster2", WeightedClusterUtil::pickCluster(clusters, 80, 79, true)->clusterName()); +} + static std::string intervalSetIntToString(const IntervalSetImpl& interval_set) { std::string out; const char* prefix = ""; diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index 1df01629ffa8..b964dfff83c8 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -481,6 +481,20 @@ class TcpProxyTest : public testing::Test { Network::Address::InstanceConstSharedPtr upstream_remote_address_; }; +TEST_F(TcpProxyTest, DefaultRoutes) { + envoy::config::filter::network::tcp_proxy::v2::TcpProxy config = defaultConfig(); + + envoy::config::filter::network::tcp_proxy::v2::TcpProxy::WeightedCluster::ClusterWeight* + ignored_cluster = config.mutable_weighted_clusters()->mutable_clusters()->Add(); + ignored_cluster->set_name("ignored_cluster"); + ignored_cluster->set_weight(10); + + configure(config); + + NiceMock connection; + EXPECT_EQ(std::string("fake_cluster"), config_->getRouteFromEntries(connection)); +} + // Tests that half-closes are proxied and don't themselves cause any connection to be closed. TEST_F(TcpProxyTest, HalfCloseProxy) { setup(1);