diff --git a/api/envoy/config/cluster/v3/cluster.proto b/api/envoy/config/cluster/v3/cluster.proto index efc08ae0700a..2dee46edfc4e 100644 --- a/api/envoy/config/cluster/v3/cluster.proto +++ b/api/envoy/config/cluster/v3/cluster.proto @@ -438,6 +438,17 @@ message Cluster { bool use_http_header = 1; } + message LbShuffleShardConfig { + option (udpa.annotations.versioning).previous_message_type = + "envoy.api.v2.Cluster.LbShuffleShardConfig"; + + google.protobuf.UInt32Value endpoints_per_cell = 1; + + bool use_zone_as_dimension = 2; + + repeated string dimensions = 3; + } + // Common configuration for all load balancer implementations. // [#next-free-field: 8] message CommonLbConfig { @@ -887,6 +898,9 @@ message Cluster { // Configuration for load balancing subsetting. LbSubsetConfig lb_subset_config = 22; + // Configuration for the ShuffleShard. + LbShuffleShardConfig lb_shuffle_shard_config = 53; + // Optional configuration for the load balancing algorithm selected by // LbPolicy. Currently only // :ref:`RING_HASH`, diff --git a/generated_api_shadow/envoy/config/cluster/v3/cluster.proto b/generated_api_shadow/envoy/config/cluster/v3/cluster.proto index ef6801a300d9..28699a3bbdb9 100644 --- a/generated_api_shadow/envoy/config/cluster/v3/cluster.proto +++ b/generated_api_shadow/envoy/config/cluster/v3/cluster.proto @@ -439,6 +439,17 @@ message Cluster { bool use_http_header = 1; } + message LbShuffleShardConfig { + option (udpa.annotations.versioning).previous_message_type = + "envoy.api.v2.Cluster.LbShuffleShardConfig"; + + google.protobuf.UInt32Value endpoints_per_cell = 1; + + bool use_zone_as_dimension = 2; + + repeated string dimensions = 3; + } + // Common configuration for all load balancer implementations. // [#next-free-field: 8] message CommonLbConfig { @@ -886,6 +897,9 @@ message Cluster { // Configuration for load balancing subsetting. LbSubsetConfig lb_subset_config = 22; + // Configuration for the ShuffleShard. + LbShuffleShardConfig lb_shuffle_shard_config = 53; + // Optional configuration for the load balancing algorithm selected by // LbPolicy. Currently only // :ref:`RING_HASH`, diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index e1b9e9c8a115..df1909fd1e58 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -809,6 +809,12 @@ class ClusterInfo { virtual const absl::optional& lbLeastRequestConfig() const PURE; + /** + * @return configuration for shuffle shard load balancing. + */ + virtual const absl::optional& + lbShuffleShardConfig() const PURE; + /** * @return configuration for ring hash load balancing, only used if type is set to ring_hash_lb. */ diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index 8ae9aa9b11e4..183114b2fef1 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -176,7 +176,7 @@ envoy_cc_library( envoy_cc_library( name = "load_balancer_lib", srcs = ["load_balancer_impl.cc"], - hdrs = ["load_balancer_impl.h"], + hdrs = ["load_balancer_impl.h", "infima.h"], deps = [ ":edf_scheduler_lib", "//include/envoy/common:random_generator_interface", @@ -185,6 +185,7 @@ envoy_cc_library( "//include/envoy/upstream:load_balancer_interface", "//include/envoy/upstream:upstream_interface", "//source/common/common:assert_lib", + "//source/common/config:well_known_names", "//source/common/protobuf:utility_lib", "//source/common/runtime:runtime_protos_lib", "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index dc731e6038b3..182738d81356 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -1316,6 +1316,11 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry( cluster->statsScope(), parent.parent_.runtime_, parent.parent_.random_, cluster->lbSubsetInfo(), cluster->lbRingHashConfig(), cluster->lbMaglevConfig(), cluster->lbLeastRequestConfig(), cluster->lbConfig()); + } else if (auto config = cluster->lbShuffleShardConfig()) { + lb_ = std::make_unique( + cluster->lbType(), priority_set_, parent_.local_priority_set_, + cluster->stats(), parent.parent_.runtime_, parent.parent_.random_, + cluster->lbConfig(), *config); } else { switch (cluster->lbType()) { case LoadBalancerType::LeastRequest: { diff --git a/source/common/upstream/infima.h b/source/common/upstream/infima.h new file mode 100644 index 000000000000..fb4177434a54 --- /dev/null +++ b/source/common/upstream/infima.h @@ -0,0 +1,156 @@ +#include +#include +#include +#include +#include +#include + +template +class Lattice { +public: + using Coordinate = std::vector; + + Lattice(std::vector dimension_names) + : dimension_names_(dimension_names) { + for (std::string dimension : dimension_names) { + values_by_dimension_[dimension] = std::vector(); + } + } + + void remove_endpoints_for_sector(Coordinate sector_coordinates, std::vector endpoints) { + auto& ep = endpoints_by_coordinate_[sector_coordinates]; + + for (auto& endpoint : endpoints) + ep.erase(std::remove(ep.begin(), ep.end(), endpoint), ep.end()); + + if (!ep.size()) { + // todo + } + } + + void add_endpoints_for_sector(Coordinate sector_coordinates, std::vector endpoints) { + // Add coordinate value if it's not already present + if(!endpoints_by_coordinate_[sector_coordinates].size()) + for (uint i = 0; i < dimension_names_.size(); i++) + values_by_dimension_[dimension_names_[i]].push_back(sector_coordinates[i]); + + endpoints_by_coordinate_[sector_coordinates].insert( + endpoints_by_coordinate_[sector_coordinates].end(), + endpoints.begin(), + endpoints.end() + ); + } + + std::vector get_dimension_names() { + return dimension_names_; + } + + std::vector get_endpoints_for_sector(Coordinate sector_coordinates) { + return endpoints_by_coordinate_[sector_coordinates]; + } + + std::vector get_dimension_values(std::string dimension_name) { + return values_by_dimension_[dimension_name]; + } + + std::map get_dimensionality() { + std::map dimensionality; + for (auto dimension : dimension_names_) { + dimensionality[dimension] = get_dimension_values(dimension).size(); + } + return dimensionality; + } + + void print_values_by_dimension() { + std::cout << "Values By Dimension:" << std::endl; + for (auto it = values_by_dimension_.begin(); it != values_by_dimension_.end(); it++) { + std::cout << it->first << ": "; + for (auto s : it->second) { + std::cout << s << ", "; + } + std::cout << std::endl; + } + } + + void print_endpoints_by_coordinate() { + std::cout << "Endpoints by Coordinate:" << std::endl; + for (auto it = endpoints_by_coordinate_.begin(); it != endpoints_by_coordinate_.end(); it++) { + std::cout << "Coordinate("; + for (auto c : it->first) std::cout << c << ", "; + std::cout << "): "; + for (auto s : it->second) std::cout << s << ", "; + std::cout << std::endl; + } + } + + std::vector get_coordinates() { + std::vector v; + for( auto it = endpoints_by_coordinate_.begin(); it != endpoints_by_coordinate_.end(); ++it ) { + v.push_back(it->first); + } + return v; + } + + std::vector get_endpoints() { + std::vector v; + for( auto it = endpoints_by_coordinate_.begin(); it != endpoints_by_coordinate_.end(); ++it ) + v.insert(v.end(), it->second.begin(), it->second.end()); + return v; + } + + const std::vector dimension_names_; + std::map > values_by_dimension_; + std::map> endpoints_by_coordinate_; +}; + +template +class ShuffleSharder { +public: + ShuffleSharder(uint64_t seed) : seed_(seed) { } + + Lattice* shuffleShard(Lattice lattice, uint64_t hash, unsigned long endpoints_per_cell) { + Lattice * chosen = new Lattice(lattice.get_dimension_names()); + + std::vector> shuffled_dimension_values; + std::mt19937 g(seed_ + hash); + + for (std::string dimensionName : lattice.get_dimension_names()) { + std::vector shuffled_values = lattice.get_dimension_values(dimensionName); + std::shuffle(shuffled_values.begin(), shuffled_values.end(), g); + shuffled_dimension_values.push_back(shuffled_values); + } + + auto dimensionality = lattice.get_dimensionality(); + + if (dimensionality.size() == 1) { + for (auto dimension_value : shuffled_dimension_values[0]) { + std::vector c{dimension_value}; + auto available_endpoints = lattice.get_endpoints_for_sector(c); + std::shuffle(available_endpoints.begin(), available_endpoints.end(), g); + std::vector returned_endpoints(available_endpoints.begin(), available_endpoints.begin() + std::min(endpoints_per_cell, available_endpoints.size())); + chosen->add_endpoints_for_sector(c, returned_endpoints); + } + return chosen; + } + + int minimum_dimension_size = INT_MAX; + for (auto it = dimensionality.begin(); it != dimensionality.end(); it++) { + if (it->second < minimum_dimension_size) { + minimum_dimension_size = it->second; + } + } + + for (auto coordinates : lattice.get_coordinates()) { + auto available_endpoints = lattice.get_endpoints_for_sector(coordinates); + if (available_endpoints.size()) { + std::shuffle(available_endpoints.begin(), available_endpoints.end(), g); + std::vector returned_endpoints(available_endpoints.begin(), available_endpoints.begin() + std::min(endpoints_per_cell, available_endpoints.size())); + chosen->add_endpoints_for_sector(coordinates, returned_endpoints); + } + } + + return chosen; + } + + const uint64_t seed_; +}; diff --git a/source/common/upstream/load_balancer_impl.cc b/source/common/upstream/load_balancer_impl.cc index 6278a4ab39dc..e9743b979b4e 100644 --- a/source/common/upstream/load_balancer_impl.cc +++ b/source/common/upstream/load_balancer_impl.cc @@ -10,6 +10,7 @@ #include "envoy/upstream/upstream.h" #include "common/common/assert.h" +#include "common/config/well_known_names.h" #include "common/protobuf/utility.h" #include "absl/container/fixed_array.h" @@ -890,6 +891,133 @@ HostConstSharedPtr RandomLoadBalancer::peekOrChoose(LoadBalancerContext* context return hosts_to_use[random_hash % hosts_to_use.size()]; } +ShuffleShardLoadBalancer::ShuffleShardLoadBalancer(LoadBalancerType lb_type, const PrioritySet& priority_set, const PrioritySet* local_priority_set, + ClusterStats& stats, Runtime::Loader& runtime, Random::RandomGenerator& random, + const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config, + const envoy::config::cluster::v3::Cluster::LbShuffleShardConfig& config) + : ZoneAwareLoadBalancerBase(priority_set, local_priority_set, stats, runtime, random, + common_config), + lb_type_(lb_type), + endpoints_per_cell_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, endpoints_per_cell, 2)), + use_zone_as_dimension_(config.use_zone_as_dimension()), + use_dimensions_(config.dimensions().size()), + shuffle_sharder_(ShuffleSharder(12345)) { + + // ENVOY_LOG(debug, "endpoints_per_cell: {}", endpoints_per_cell_); + // ENVOY_LOG(debug, "use_zone_as_dimension: {}", use_zone_as_dimension_); + + for (auto dimension : config.dimensions()) + dimensions_.push_back(dimension); + if (use_zone_as_dimension_) + dimensions_.push_back("_envoy_zone"); + if (!dimensions_.size()) + dimensions_.push_back("_envoy_unit_coord"); + + lattice_ = new Lattice(dimensions_); + + priority_update_cb_ = priority_set_.addPriorityUpdateCb( + [this](uint32_t /*priority*/, const HostVector& hosts_added, const HostVector& hosts_removed) -> void { + add_hosts(hosts_added); + remove_hosts(hosts_removed); + }); +} + +HostConstSharedPtr ShuffleShardLoadBalancer::chooseHostOnce(LoadBalancerContext* context) { + absl::optional hash; + if (context) { + hash = context->computeHashKey(); + } + const uint64_t seed = hash ? hash.value() : random_.random(); + + auto endpoints = shuffle_sharder_.shuffleShard(*lattice_, seed, endpoints_per_cell_)->get_endpoints(); + + // Random + if (lb_type_ == LoadBalancerType::Random) + return endpoints[random_.random() % endpoints.size()]; + + // Least Request + HostConstSharedPtr candidate_host = nullptr; + for (uint32_t choice_idx = 0; choice_idx < 2; ++choice_idx) { + const int rand_idx = random_.random() % endpoints.size(); + HostConstSharedPtr sampled_host = endpoints[rand_idx]; + + if (candidate_host == nullptr) { + // Make a first choice to start the comparisons. + candidate_host = sampled_host; + continue; + } + + const auto candidate_active_rq = candidate_host->stats().rq_active_.value(); + const auto sampled_active_rq = sampled_host->stats().rq_active_.value(); + if (sampled_active_rq < candidate_active_rq) { + candidate_host = sampled_host; + } + } + + return candidate_host; +} + +void ShuffleShardLoadBalancer::remove_hosts(const HostVector& hosts) { + for (auto& host : hosts) { + auto coord = get_coord(host); + if (!coord) { + continue; + } + + std::vector hosts; + hosts.push_back(host); + lattice_->remove_endpoints_for_sector(*coord, hosts); + } +} + +void ShuffleShardLoadBalancer::add_hosts(const HostVector& hosts) { + for (auto& host : hosts) { + auto coord = get_coord(host); + if (!coord) { + continue; + } + + std::vector hosts; + hosts.push_back(host); + lattice_->add_endpoints_for_sector(*coord, hosts); + } +} + +std::optional> ShuffleShardLoadBalancer::get_coord(const HostConstSharedPtr& host) { + std::vector coord; + + if (use_dimensions_) { + if (!host->metadata()) { + // ENVOY_LOG(warn, "ignoring host {} because it has no metadata", host->hostname()); + return {}; + } + const envoy::config::core::v3::Metadata& metadata = *host->metadata(); + const auto& filter_it = metadata.filter_metadata().find(Envoy::Config::MetadataFilters::get().ENVOY_LB); + if (filter_it == metadata.filter_metadata().end()) { + // ENVOY_LOG(warn, "ignoring host {} because it has no envoy.lb metadata", host->hostname()); + return {}; + } + const auto& fields = filter_it->second.fields(); + for (const auto& key : dimensions_) { + const auto it = fields.find(key); + if (it == fields.end()) { + // ENVOY_LOG(warn, "ignoring host {} because it has no envoy.lb tag {}", host->hostname(), key); + return {}; + } + std::ostringstream buf; + buf << MessageUtil::getJsonStringFromMessageOrDie(it->second); + coord.push_back(buf.str()); + } + } + if (use_zone_as_dimension_) { + coord.push_back(host->locality().zone()); + } + if (!coord.size()) { + coord.push_back("_envoy_unit_coord"); + } + return coord; +} + SubsetSelectorImpl::SubsetSelectorImpl( const Protobuf::RepeatedPtrField& selector_keys, envoy::config::cluster::v3::Cluster::LbSubsetConfig::LbSubsetSelector:: diff --git a/source/common/upstream/load_balancer_impl.h b/source/common/upstream/load_balancer_impl.h index db9f815dce0e..f39bb24d7c15 100644 --- a/source/common/upstream/load_balancer_impl.h +++ b/source/common/upstream/load_balancer_impl.h @@ -18,6 +18,8 @@ #include "common/runtime/runtime_protos.h" #include "common/upstream/edf_scheduler.h" +#include "infima.h" + namespace Envoy { namespace Upstream { @@ -587,6 +589,35 @@ class RandomLoadBalancer : public ZoneAwareLoadBalancerBase { HostConstSharedPtr peekOrChoose(LoadBalancerContext* context, bool peek); }; + +class ShuffleShardLoadBalancer : public ZoneAwareLoadBalancerBase { +public: + ShuffleShardLoadBalancer(LoadBalancerType lb_type, const PrioritySet& priority_set, const PrioritySet* local_priority_set, + ClusterStats& stats, Runtime::Loader& runtime, Random::RandomGenerator& random, + const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config, + const envoy::config::cluster::v3::Cluster::LbShuffleShardConfig& config); + + HostConstSharedPtr chooseHostOnce(LoadBalancerContext* context) override; + + HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } + +private: + void remove_hosts(const HostVector&); + + void add_hosts(const HostVector&); + + std::optional> get_coord(const HostConstSharedPtr&); + + const LoadBalancerType lb_type_; + const uint32_t endpoints_per_cell_; + const bool use_zone_as_dimension_; + std::vector dimensions_; + const bool use_dimensions_; + Lattice* lattice_; + ShuffleSharder shuffle_sharder_; + Common::CallbackHandlePtr priority_update_cb_; +}; + /** * Implementation of SubsetSelector */ diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 5b8918ba5a19..c3dae0e42d51 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -740,6 +740,7 @@ ClusterInfoImpl::ClusterInfoImpl( maintenance_mode_runtime_key_(absl::StrCat("upstream.maintenance_mode.", name_)), source_address_(getSourceAddress(config, bind_config)), lb_least_request_config_(config.least_request_lb_config()), + lb_shuffle_shard_config_(config.lb_shuffle_shard_config()), lb_ring_hash_config_(config.ring_hash_lb_config()), lb_maglev_config_(config.maglev_lb_config()), lb_original_dst_config_(config.original_dst_lb_config()), diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 3510313adc7d..896d404ff2ab 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -576,6 +576,10 @@ class ClusterInfoImpl : public ClusterInfo, lbLeastRequestConfig() const override { return lb_least_request_config_; } + const absl::optional& + lbShuffleShardConfig() const override { + return lb_shuffle_shard_config_; + } const absl::optional& lbRingHashConfig() const override { return lb_ring_hash_config_; @@ -708,6 +712,8 @@ class ClusterInfoImpl : public ClusterInfo, LoadBalancerType lb_type_; absl::optional lb_least_request_config_; + absl::optional + lb_shuffle_shard_config_; absl::optional lb_ring_hash_config_; absl::optional lb_maglev_config_; absl::optional lb_original_dst_config_;