Skip to content

Commit

Permalink
Shuffle Shard LB
Browse files Browse the repository at this point in the history
Signed-off-by: Charlie Getzen <[email protected]>
  • Loading branch information
cgetzen committed Mar 9, 2021
1 parent f2a517c commit 8c1bdfb
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 1 deletion.
14 changes: 14 additions & 0 deletions api/envoy/config/cluster/v3/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,17 @@ message Cluster {
bool use_http_header = 1;
}

message LbShuffleShardConfig {
option (udpa.annotations.versioning).previous_message_type =
"envoy.api.v2.Cluster.LbShuffleShardConfig";

google.protobuf.UInt32Value endpoints_per_cell = 1;

bool use_zone_as_dimension = 2;

repeated string dimensions = 3;
}

// Common configuration for all load balancer implementations.
// [#next-free-field: 8]
message CommonLbConfig {
Expand Down Expand Up @@ -887,6 +898,9 @@ message Cluster {
// Configuration for load balancing subsetting.
LbSubsetConfig lb_subset_config = 22;

// Configuration for the ShuffleShard.
LbShuffleShardConfig lb_shuffle_shard_config = 53;

// Optional configuration for the load balancing algorithm selected by
// LbPolicy. Currently only
// :ref:`RING_HASH<envoy_api_enum_value_config.cluster.v3.Cluster.LbPolicy.RING_HASH>`,
Expand Down
14 changes: 14 additions & 0 deletions generated_api_shadow/envoy/config/cluster/v3/cluster.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,12 @@ class ClusterInfo {
virtual const absl::optional<envoy::config::cluster::v3::Cluster::LeastRequestLbConfig>&
lbLeastRequestConfig() const PURE;

/**
* @return configuration for shuffle shard load balancing.
*/
virtual const absl::optional<envoy::config::cluster::v3::Cluster::LbShuffleShardConfig>&
lbShuffleShardConfig() const PURE;

/**
* @return configuration for ring hash load balancing, only used if type is set to ring_hash_lb.
*/
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ envoy_cc_library(
envoy_cc_library(
name = "load_balancer_lib",
srcs = ["load_balancer_impl.cc"],
hdrs = ["load_balancer_impl.h"],
hdrs = ["load_balancer_impl.h", "infima.h"],
deps = [
":edf_scheduler_lib",
"//include/envoy/common:random_generator_interface",
Expand All @@ -185,6 +185,7 @@ envoy_cc_library(
"//include/envoy/upstream:load_balancer_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/common:assert_lib",
"//source/common/config:well_known_names",
"//source/common/protobuf:utility_lib",
"//source/common/runtime:runtime_protos_lib",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
Expand Down
5 changes: 5 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,11 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
cluster->statsScope(), parent.parent_.runtime_, parent.parent_.random_,
cluster->lbSubsetInfo(), cluster->lbRingHashConfig(), cluster->lbMaglevConfig(),
cluster->lbLeastRequestConfig(), cluster->lbConfig());
} else if (auto config = cluster->lbShuffleShardConfig()) {
lb_ = std::make_unique<ShuffleShardLoadBalancer>(
cluster->lbType(), priority_set_, parent_.local_priority_set_,
cluster->stats(), parent.parent_.runtime_, parent.parent_.random_,
cluster->lbConfig(), *config);
} else {
switch (cluster->lbType()) {
case LoadBalancerType::LeastRequest: {
Expand Down
156 changes: 156 additions & 0 deletions source/common/upstream/infima.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#include <map>
#include <iostream>
#include <vector>
#include <algorithm>
#include <random>
#include <climits>

template <typename Type>
class Lattice {
public:
using Coordinate = std::vector<std::string>;

Lattice(std::vector<std::string> dimension_names)
: dimension_names_(dimension_names) {
for (std::string dimension : dimension_names) {
values_by_dimension_[dimension] = std::vector<std::string>();
}
}

void remove_endpoints_for_sector(Coordinate sector_coordinates, std::vector<Type> endpoints) {
auto& ep = endpoints_by_coordinate_[sector_coordinates];

for (auto& endpoint : endpoints)
ep.erase(std::remove(ep.begin(), ep.end(), endpoint), ep.end());

if (!ep.size()) {
// todo
}
}

void add_endpoints_for_sector(Coordinate sector_coordinates, std::vector<Type> endpoints) {
// Add coordinate value if it's not already present
if(!endpoints_by_coordinate_[sector_coordinates].size())
for (uint i = 0; i < dimension_names_.size(); i++)
values_by_dimension_[dimension_names_[i]].push_back(sector_coordinates[i]);

endpoints_by_coordinate_[sector_coordinates].insert(
endpoints_by_coordinate_[sector_coordinates].end(),
endpoints.begin(),
endpoints.end()
);
}

std::vector<std::string> get_dimension_names() {
return dimension_names_;
}

std::vector<Type> get_endpoints_for_sector(Coordinate sector_coordinates) {
return endpoints_by_coordinate_[sector_coordinates];
}

std::vector<std::string> get_dimension_values(std::string dimension_name) {
return values_by_dimension_[dimension_name];
}

std::map<std::string, int> get_dimensionality() {
std::map<std::string, int> dimensionality;
for (auto dimension : dimension_names_) {
dimensionality[dimension] = get_dimension_values(dimension).size();
}
return dimensionality;
}

void print_values_by_dimension() {
std::cout << "Values By Dimension:" << std::endl;
for (auto it = values_by_dimension_.begin(); it != values_by_dimension_.end(); it++) {
std::cout << it->first << ": ";
for (auto s : it->second) {
std::cout << s << ", ";
}
std::cout << std::endl;
}
}

void print_endpoints_by_coordinate() {
std::cout << "Endpoints by Coordinate:" << std::endl;
for (auto it = endpoints_by_coordinate_.begin(); it != endpoints_by_coordinate_.end(); it++) {
std::cout << "Coordinate(";
for (auto c : it->first) std::cout << c << ", ";
std::cout << "): ";
for (auto s : it->second) std::cout << s << ", ";
std::cout << std::endl;
}
}

std::vector<Coordinate> get_coordinates() {
std::vector<Coordinate> v;
for( auto it = endpoints_by_coordinate_.begin(); it != endpoints_by_coordinate_.end(); ++it ) {
v.push_back(it->first);
}
return v;
}

std::vector<Type> get_endpoints() {
std::vector<Type> v;
for( auto it = endpoints_by_coordinate_.begin(); it != endpoints_by_coordinate_.end(); ++it )
v.insert(v.end(), it->second.begin(), it->second.end());
return v;
}

const std::vector<std::string> dimension_names_;
std::map<std::string, std::vector<std::string> > values_by_dimension_;
std::map<Coordinate, std::vector<Type>> endpoints_by_coordinate_;
};

template <typename Type>
class ShuffleSharder {
public:
ShuffleSharder(uint64_t seed) : seed_(seed) { }

Lattice<Type>* shuffleShard(Lattice<Type> lattice, uint64_t hash, unsigned long endpoints_per_cell) {
Lattice<Type> * chosen = new Lattice<Type>(lattice.get_dimension_names());

std::vector<std::vector<std::string>> shuffled_dimension_values;
std::mt19937 g(seed_ + hash);

for (std::string dimensionName : lattice.get_dimension_names()) {
std::vector<std::string> shuffled_values = lattice.get_dimension_values(dimensionName);
std::shuffle(shuffled_values.begin(), shuffled_values.end(), g);
shuffled_dimension_values.push_back(shuffled_values);
}

auto dimensionality = lattice.get_dimensionality();

if (dimensionality.size() == 1) {
for (auto dimension_value : shuffled_dimension_values[0]) {
std::vector<std::string> c{dimension_value};
auto available_endpoints = lattice.get_endpoints_for_sector(c);
std::shuffle(available_endpoints.begin(), available_endpoints.end(), g);
std::vector<Type> returned_endpoints(available_endpoints.begin(), available_endpoints.begin() + std::min(endpoints_per_cell, available_endpoints.size()));
chosen->add_endpoints_for_sector(c, returned_endpoints);
}
return chosen;
}

int minimum_dimension_size = INT_MAX;
for (auto it = dimensionality.begin(); it != dimensionality.end(); it++) {
if (it->second < minimum_dimension_size) {
minimum_dimension_size = it->second;
}
}

for (auto coordinates : lattice.get_coordinates()) {
auto available_endpoints = lattice.get_endpoints_for_sector(coordinates);
if (available_endpoints.size()) {
std::shuffle(available_endpoints.begin(), available_endpoints.end(), g);
std::vector<Type> returned_endpoints(available_endpoints.begin(), available_endpoints.begin() + std::min(endpoints_per_cell, available_endpoints.size()));
chosen->add_endpoints_for_sector(coordinates, returned_endpoints);
}
}

return chosen;
}

const uint64_t seed_;
};
128 changes: 128 additions & 0 deletions source/common/upstream/load_balancer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "envoy/upstream/upstream.h"

#include "common/common/assert.h"
#include "common/config/well_known_names.h"
#include "common/protobuf/utility.h"

#include "absl/container/fixed_array.h"
Expand Down Expand Up @@ -890,6 +891,133 @@ HostConstSharedPtr RandomLoadBalancer::peekOrChoose(LoadBalancerContext* context
return hosts_to_use[random_hash % hosts_to_use.size()];
}

ShuffleShardLoadBalancer::ShuffleShardLoadBalancer(LoadBalancerType lb_type, const PrioritySet& priority_set, const PrioritySet* local_priority_set,
ClusterStats& stats, Runtime::Loader& runtime, Random::RandomGenerator& random,
const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config,
const envoy::config::cluster::v3::Cluster::LbShuffleShardConfig& config)
: ZoneAwareLoadBalancerBase(priority_set, local_priority_set, stats, runtime, random,
common_config),
lb_type_(lb_type),
endpoints_per_cell_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, endpoints_per_cell, 2)),
use_zone_as_dimension_(config.use_zone_as_dimension()),
use_dimensions_(config.dimensions().size()),
shuffle_sharder_(ShuffleSharder<HostConstSharedPtr>(12345)) {

// ENVOY_LOG(debug, "endpoints_per_cell: {}", endpoints_per_cell_);
// ENVOY_LOG(debug, "use_zone_as_dimension: {}", use_zone_as_dimension_);

for (auto dimension : config.dimensions())
dimensions_.push_back(dimension);
if (use_zone_as_dimension_)
dimensions_.push_back("_envoy_zone");
if (!dimensions_.size())
dimensions_.push_back("_envoy_unit_coord");

lattice_ = new Lattice<HostConstSharedPtr>(dimensions_);

priority_update_cb_ = priority_set_.addPriorityUpdateCb(
[this](uint32_t /*priority*/, const HostVector& hosts_added, const HostVector& hosts_removed) -> void {
add_hosts(hosts_added);
remove_hosts(hosts_removed);
});
}

HostConstSharedPtr ShuffleShardLoadBalancer::chooseHostOnce(LoadBalancerContext* context) {
absl::optional<uint64_t> hash;
if (context) {
hash = context->computeHashKey();
}
const uint64_t seed = hash ? hash.value() : random_.random();

auto endpoints = shuffle_sharder_.shuffleShard(*lattice_, seed, endpoints_per_cell_)->get_endpoints();

// Random
if (lb_type_ == LoadBalancerType::Random)
return endpoints[random_.random() % endpoints.size()];

// Least Request
HostConstSharedPtr candidate_host = nullptr;
for (uint32_t choice_idx = 0; choice_idx < 2; ++choice_idx) {
const int rand_idx = random_.random() % endpoints.size();
HostConstSharedPtr sampled_host = endpoints[rand_idx];

if (candidate_host == nullptr) {
// Make a first choice to start the comparisons.
candidate_host = sampled_host;
continue;
}

const auto candidate_active_rq = candidate_host->stats().rq_active_.value();
const auto sampled_active_rq = sampled_host->stats().rq_active_.value();
if (sampled_active_rq < candidate_active_rq) {
candidate_host = sampled_host;
}
}

return candidate_host;
}

void ShuffleShardLoadBalancer::remove_hosts(const HostVector& hosts) {
for (auto& host : hosts) {
auto coord = get_coord(host);
if (!coord) {
continue;
}

std::vector<HostConstSharedPtr> hosts;
hosts.push_back(host);
lattice_->remove_endpoints_for_sector(*coord, hosts);
}
}

void ShuffleShardLoadBalancer::add_hosts(const HostVector& hosts) {
for (auto& host : hosts) {
auto coord = get_coord(host);
if (!coord) {
continue;
}

std::vector<HostConstSharedPtr> hosts;
hosts.push_back(host);
lattice_->add_endpoints_for_sector(*coord, hosts);
}
}

std::optional<std::vector<std::string>> ShuffleShardLoadBalancer::get_coord(const HostConstSharedPtr& host) {
std::vector<std::string> coord;

if (use_dimensions_) {
if (!host->metadata()) {
// ENVOY_LOG(warn, "ignoring host {} because it has no metadata", host->hostname());
return {};
}
const envoy::config::core::v3::Metadata& metadata = *host->metadata();
const auto& filter_it = metadata.filter_metadata().find(Envoy::Config::MetadataFilters::get().ENVOY_LB);
if (filter_it == metadata.filter_metadata().end()) {
// ENVOY_LOG(warn, "ignoring host {} because it has no envoy.lb metadata", host->hostname());
return {};
}
const auto& fields = filter_it->second.fields();
for (const auto& key : dimensions_) {
const auto it = fields.find(key);
if (it == fields.end()) {
// ENVOY_LOG(warn, "ignoring host {} because it has no envoy.lb tag {}", host->hostname(), key);
return {};
}
std::ostringstream buf;
buf << MessageUtil::getJsonStringFromMessageOrDie(it->second);
coord.push_back(buf.str());
}
}
if (use_zone_as_dimension_) {
coord.push_back(host->locality().zone());
}
if (!coord.size()) {
coord.push_back("_envoy_unit_coord");
}
return coord;
}

SubsetSelectorImpl::SubsetSelectorImpl(
const Protobuf::RepeatedPtrField<std::string>& selector_keys,
envoy::config::cluster::v3::Cluster::LbSubsetConfig::LbSubsetSelector::
Expand Down
Loading

0 comments on commit 8c1bdfb

Please sign in to comment.