Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: Implement WRSQ Scheduler #14681

Merged
merged 20 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions envoy/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,10 @@ envoy_cc_library(
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "scheduler_interface",
hdrs = ["scheduler.h"],
deps = [
],
)
57 changes: 57 additions & 0 deletions envoy/upstream/scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#pragma once

#include <functional>
#include <memory>

namespace Envoy {
namespace Upstream {

/**
* The base class for scheduler implementations used in various load balancers.
*/
template <class C> class Scheduler {
public:
virtual ~Scheduler() = default;

/**
* Each time peekAgain is called, it will return the best-effort subsequent
* pick, popping and reinserting the entry as if it had been picked.
* The first time peekAgain is called, it will return the
* first item which will be picked, the second time it is called it will
* return the second item which will be picked. As picks occur, that window
* will shrink.
*
* @param calculate_weight for implemenations that choose to support it, this predicate specifies
* the new weight of the entry.
* @return std::shared_ptr<C> the best effort subsequent pick.
*/

virtual std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) = 0;

/**
* Pick a queue entry with closest deadline.
*
* @param calculate_weight for implemenations that choose to support it, this predicate specifies
* the new weight of the entry.
* @return std::shared_ptr<C> to next valid the queue entry if or nullptr if none exists.
*/
virtual std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) = 0;

/**
* Insert entry into queue with a given weight.
*
* @param weight entry weight.
* @param entry shared pointer to entry.
*/
virtual void add(double weight, std::shared_ptr<C> entry) = 0;

/**
* Returns true if the scheduler is empty and nothing has been added.
*
* @return bool whether or not the internal container is empty.
*/
virtual bool empty() const = 0;
};

} // namespace Upstream
} // namespace Envoy
16 changes: 12 additions & 4 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,17 @@ envoy_cc_library(
)

envoy_cc_library(
name = "edf_scheduler_lib",
hdrs = ["edf_scheduler.h"],
deps = ["//source/common/common:assert_lib"],
name = "scheduler_lib",
hdrs = [
"edf_scheduler.h",
"wrsq_scheduler.h",
],
deps = [
"//envoy/common:random_generator_interface",
"//envoy/upstream:scheduler_interface",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
Expand Down Expand Up @@ -198,7 +206,7 @@ envoy_cc_library(
srcs = ["load_balancer_impl.cc"],
hdrs = ["load_balancer_impl.h"],
deps = [
":edf_scheduler_lib",
":scheduler_lib",
"//envoy/common:random_generator_interface",
"//envoy/runtime:runtime_interface",
"//envoy/stats:stats_interface",
Expand Down
34 changes: 8 additions & 26 deletions source/common/upstream/edf_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <iostream>
#include <queue>

#include "envoy/upstream/scheduler.h"

#include "source/common/common/assert.h"

namespace Envoy {
Expand All @@ -23,16 +25,10 @@ namespace Upstream {
// Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set
// at current time + 1 / weight, providing weighted round robin behavior with floating point
// weights and an O(log n) pick time.
template <class C> class EdfScheduler {
template <class C> class EdfScheduler : public Scheduler<C> {
public:
// Each time peekAgain is called, it will return the best-effort subsequent
// pick, popping and reinserting the entry as if it had been picked, and
// inserting it into the pre-picked queue.
// The first time peekAgain is called, it will return the
// first item which will be picked, the second time it is called it will
// return the second item which will be picked. As picks occur, that window
// will shrink.
std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) {
// See scheduler.h for an explanation of each public method.
std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override {
if (hasEntry()) {
prepick_list_.push_back(std::move(queue_.top().entry_));
std::shared_ptr<C> ret{prepick_list_.back()};
Expand All @@ -43,12 +39,7 @@ template <class C> class EdfScheduler {
return nullptr;
}

/**
* Pick queue entry with closest deadline and adds it back using the weight
* from calculate_weight.
* @return std::shared_ptr<C> to next valid the queue entry if or nullptr if none exists.
*/
std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) {
std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override {
while (!prepick_list_.empty()) {
// In this case the entry was added back during peekAgain so don't re-add.
if (prepick_list_.front().expired()) {
Expand All @@ -68,12 +59,7 @@ template <class C> class EdfScheduler {
return nullptr;
}

/**
* Insert entry into queue with a given weight. The deadline will be current_time_ + 1 / weight.
* @param weight floating point weight.
* @param entry shared pointer to entry, only a weak reference will be retained.
*/
void add(double weight, std::shared_ptr<C> entry) {
void add(double weight, std::shared_ptr<C> entry) override {
ASSERT(weight > 0);
const double deadline = current_time_ + 1.0 / weight;
EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.",
Expand All @@ -82,11 +68,7 @@ template <class C> class EdfScheduler {
ASSERT(queue_.top().deadline_ >= current_time_);
}

/**
* Implements empty() on the internal queue. Does not attempt to discard expired elements.
* @return bool whether or not the internal queue is empty.
*/
bool empty() const { return queue_.empty(); }
bool empty() const override { return queue_.empty(); }

private:
/**
Expand Down
191 changes: 191 additions & 0 deletions source/common/upstream/wrsq_scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#pragma once

#include <algorithm>
#include <iostream>
#include <memory>
#include <queue>
#include <utility>
#include <vector>

#include "envoy/common/random_generator.h"
#include "envoy/upstream/scheduler.h"

#include "source/common/common/assert.h"
#include "source/common/common/logger.h"

#include "absl/container/flat_hash_map.h"

namespace Envoy {
namespace Upstream {

// Weighted Random Selection Queue (WRSQ) Scheduler
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this comment after not thinking about this for 2 months, it can improve. Some observations to reference later:

  • Elaborate on why it's like vanilla RR or WRS based on the weight distribution.
  • Beef up the note.
  • Consider an ASCII diagram.

// ------------------------------------------------
// This scheduler keeps a queue for each unique weight among all objects inserted and adds the
// objects to their respective queue based on weight. When performing a pick operation, a queue is
// selected and an object is pulled. Each queue gets its own selection probability which is weighted
// as the sum of all weights of objects contained within. Once a queue is picked, you can simply
// pull from the top and honor the expected selection probability of each object.
//
// Adding an object will cause the scheduler to rebuild internal structures on the first pick that
// follows. This first pick operation will be linear on the number of unique weights among objects
// inserted. Subsequent picks will be logarithmic with the number of unique weights. Adding objects
// is always constant time.
//
// For the case where all object weights are the same, WRSQ behaves identical to vanilla
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does wrsq handle first pick determinism for the case that all hosts have the same weight?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that will be addressed in a subsequent patch when plumbing WRSQ into the load balancer. In the case you mention, first pick determinism is fixed for WRSQ by simply adding hosts to the scheduler in a random order during refresh.

// round-robin. If all object weights are different, it behaves identical to weighted random
// selection.
//
// NOTE: While the base scheduler interface allows for mutation of object weights with each pick,
// this implementation is not meant for circumstances where the object weights change with each pick
// (like in the least request LB). This scheduler implementation will perform quite poorly if the
// object weights change often.
template <class C>
class WRSQScheduler : public Scheduler<C>, protected Logger::Loggable<Logger::Id::upstream> {
public:
WRSQScheduler(Random::RandomGenerator& random) : random_(random) {}

std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override {
std::shared_ptr<C> picked{pickAndAddInternal(calculate_weight)};
if (picked != nullptr) {
prepick_queue_.emplace(picked);
}
return picked;
}

std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override {
// Burn through the pre-pick queue.
while (!prepick_queue_.empty()) {
std::shared_ptr<C> prepicked_obj = prepick_queue_.front().lock();
prepick_queue_.pop();
if (prepicked_obj != nullptr) {
return prepicked_obj;
}
}

return pickAndAddInternal(calculate_weight);
}

void add(double weight, std::shared_ptr<C> entry) override {
rebuild_cumulative_weights_ = true;
queue_map_[weight].emplace(std::move(entry));
}

bool empty() const override { return queue_map_.empty(); }

private:
using ObjQueue = std::queue<std::weak_ptr<C>>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may benefit from being a class that automatically sets rebuild_cumulative_weights_ when it's mutated.


// TODO(tonya11en): We can reduce memory utilization by using an absl::flat_hash_map of QueueInfo
// with heterogeneous lookup on the weight. This would allow us to save 8 bytes per unique weight.
using QueueMap = absl::flat_hash_map<double, ObjQueue>;

// Used to store a queue's weight info necessary to perform the weighted random selection.
struct QueueInfo {
double cumulative_weight;
double weight;
ObjQueue& q;
};

// If needed, such as after object expiry or addition, rebuild the cumulative weights vector.
void maybeRebuildCumulativeWeights() {
if (!rebuild_cumulative_weights_) {
return;
}

cumulative_weights_.clear();
cumulative_weights_.reserve(queue_map_.size());

double weight_sum = 0;
for (auto& it : queue_map_) {
const auto weight_val = it.first;
weight_sum += weight_val * it.second.size();
cumulative_weights_.push_back({weight_sum, weight_val, it.second});
}

rebuild_cumulative_weights_ = false;
}

// Performs a weighted random selection on the queues containing objects of the same weight.
// Popping off the top of the queue to pick an object will honor the selection probability based
// on the weight provided when the object was added.
QueueInfo& chooseQueue() {
ASSERT(!queue_map_.empty());

maybeRebuildCumulativeWeights();

const double weight_sum = cumulative_weights_.back().cumulative_weight;
uint64_t rnum = random_.random() % static_cast<uint32_t>(weight_sum);
auto it = std::upper_bound(cumulative_weights_.begin(), cumulative_weights_.end(), rnum,
[](auto a, auto b) { return a < b.cumulative_weight; });
ASSERT(it != cumulative_weights_.end());
return *it;
}

// Remove objects from the queue until it's empty or there is an unexpired object at the front. If
// the queue is purged to empty, it's removed from the queue map and we return true.
bool purgeExpired(QueueInfo& qinfo) {
while (!qinfo.q.empty() && qinfo.q.front().expired()) {
qinfo.q.pop();
rebuild_cumulative_weights_ = true;
}

if (qinfo.q.empty()) {
queue_map_.erase(qinfo.weight);
return true;
}
return false;
}

std::shared_ptr<C> pickAndAddInternal(std::function<double(const C&)> calculate_weight) {
while (!queue_map_.empty()) {
QueueInfo& qinfo = chooseQueue();
if (purgeExpired(qinfo)) {
// The chosen queue was purged to empty and removed from the queue map. Try again.
continue;
}

auto obj = qinfo.q.front().lock();
qinfo.q.pop();
if (obj == nullptr) {
// The object expired after the purge.
continue;
}

const double new_weight = calculate_weight ? calculate_weight(*obj) : qinfo.weight;
if (new_weight == qinfo.weight) {
qinfo.q.emplace(obj);
} else {
// The weight has changed for this object, so we must re-add it to the scheduler.
ENVOY_LOG_EVERY_POW_2(
warn, "WRSQ scheduler is used with a load balancer that mutates host weights with each "
"selection, this will likely result in poor selection performance");
add(new_weight, obj);
}

return obj;
}

return nullptr;
}

Random::RandomGenerator& random_;

// Objects already picked via peekAgain().
ObjQueue prepick_queue_;

// A mapping from an object weight to the associated queue.
QueueMap queue_map_;

// Stores the necessary information to perform a weighted random selection of the different
// queues. A cumulative sum is also kept of the total object weights for a queue, which allows for
// a single random number generation and a binary search to pick a queue.
std::vector<QueueInfo> cumulative_weights_;

// Keeps state that determines whether the cumulative weights need to be rebuilt. If any objects
// contained in a queue change from addition or expiry, it throws off the cumulative weight
// values. Therefore, they must be recalculated.
bool rebuild_cumulative_weights_{true};
};

} // namespace Upstream
} // namespace Envoy
23 changes: 22 additions & 1 deletion test/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,19 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "wrsq_scheduler_test",
srcs = ["wrsq_scheduler_test.cc"],
deps = [
"//source/common/upstream:scheduler_lib",
"//test/mocks:common_lib",
],
)

envoy_cc_test(
name = "edf_scheduler_test",
srcs = ["edf_scheduler_test.cc"],
deps = ["//source/common/upstream:edf_scheduler_lib"],
deps = ["//source/common/upstream:scheduler_lib"],
)

envoy_cc_test(
Expand Down Expand Up @@ -821,3 +830,15 @@ envoy_cc_fuzz_test(
":zone_aware_load_balancer_fuzz_lib",
],
)

envoy_cc_benchmark_binary(
name = "scheduler_benchmark",
srcs = ["scheduler_benchmark.cc"],
external_deps = [
"benchmark",
],
deps = [
"//source/common/common:random_generator_lib",
"//source/common/upstream:scheduler_lib",
],
)
Loading