Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weighted load balancing policy #1922

Draft
wants to merge 1 commit into
base: 4.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,41 @@ public enum DefaultDriverOption implements DriverOption {
LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS(
"advanced.load-balancing-policy.dc-failover.allow-for-local-consistency-levels"),

/**
* How many items in the plan to score.
*
* <p>Value-Type: int
*/
LOAD_BALANCING_SCORED_PLAN_SIZE("advanced.load-balancing-policy.weighted.scored-plan-size"),

/**
* Weight to apply when load balancing for a non-rack node.
*
* <p>Value-Type: double
*/
LOAD_BALANCING_WEIGHT_NON_RACK("advanced.load-balancing-policy.weighted.non-rack"),

/**
* Weight to apply when load balancing for a non-replica node.
*
* <p>Value-Type: double
*/
LOAD_BALANCING_WEIGHT_NON_REPLICA("advanced.load-balancing-policy.weighted.non-replica"),

/**
* Weight to apply when load balancing for a node that is still starting up.
*
* <p>Value-Type: double
*/
LOAD_BALANCING_WEIGHT_STARTING("advanced.load-balancing-policy.weighted.starting"),

/**
* Weight to apply when load balancing for an unhealthy node.
*
* <p>Value-Type: double
*/
LOAD_BALANCING_WEIGHT_UNHEALTHY("advanced.load-balancing-policy.weighted.unhealthy"),

/**
* The classname of the desired {@code MetricIdGenerator} implementation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,36 @@ public String toString() {
new TypedDriverOption<>(
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS,
GenericType.BOOLEAN);
/** How many items in the plan to score. */
public static final TypedDriverOption<Integer>
LOAD_BALANCING_SCORED_PLAN_SIZE =
new TypedDriverOption<>(
DefaultDriverOption.LOAD_BALANCING_SCORED_PLAN_SIZE,
GenericType.INTEGER);
/** Weight to apply when load balancing for a non-rack node. */
public static final TypedDriverOption<Double>
LOAD_BALANCING_WEIGHT_NON_RACK =
new TypedDriverOption<>(
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_RACK,
GenericType.DOUBLE);
/** Weight to apply when load balancing for a non-replica node. */
public static final TypedDriverOption<Double>
LOAD_BALANCING_WEIGHT_NON_REPLICA =
new TypedDriverOption<>(
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_REPLICA,
GenericType.DOUBLE);
/** Weight to apply when load balancing for a node that is still starting up. */
public static final TypedDriverOption<Double>
LOAD_BALANCING_WEIGHT_STARTING =
new TypedDriverOption<>(
DefaultDriverOption.LOAD_BALANCING_WEIGHT_STARTING,
GenericType.DOUBLE);
/** Weight to apply when load balancing for an unhealthy node. */
public static final TypedDriverOption<Double>
LOAD_BALANCING_WEIGHT_UNHEALTHY =
new TypedDriverOption<>(
DefaultDriverOption.LOAD_BALANCING_WEIGHT_UNHEALTHY,
GenericType.DOUBLE);

private static Iterable<TypedDriverOption<?>> introspectBuiltInValues() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.loadbalancing;

import static java.util.concurrent.TimeUnit.SECONDS;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.internal.core.util.ArrayUtils;
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A load balancing policy that optimally balances between sending load to local token holder,
* rack replicas, and local datacenter replicas (in that order).
*
* The default weights are good for the vast majority of use cases, but you can tweak them to get different behavior.
*/
public class WeightedLoadBalancingPolicy extends DefaultLoadBalancingPolicy {
private static final Logger LOG =
LoggerFactory.getLogger(WeightedLoadBalancingPolicy.class);
// Each client will randomly skew so traffic is introduced gradually to a newly up replica
// Each client will start sending at a period between 15s and 30, and they will gradually
// increase load over the next 15 seconds.
private static final long DELAY_TRAFFIC_SKEW_MILLIS = SECONDS.toMillis(15);
private static final long DELAY_TRAFFIC_MILLIS =
DELAY_TRAFFIC_SKEW_MILLIS + ThreadLocalRandom.current().nextLong(DELAY_TRAFFIC_SKEW_MILLIS);

// By default we will only score this many nodes, the rest will get added on without scoring.
// We don't usually need to score every single node if there are more than a few.
static final int DEFAULT_SCORED_PLAN_SIZE = 8;
// Default multiplicative weights. Think of this like "How much concurrency must occur
// before I fail off this node to the next". Note that these defaults are intentionally
// meant to shed load to unloaded rack coordinators when a replica set is all
// relatively heavily loaded (specifically 3x as loaded).
static final double DEFAULT_WEIGHT_NON_RACK = 4.0;
static final double DEFAULT_WEIGHT_NON_REPLICA = 12.0;
static final double DEFAULT_WEIGHT_STARTING = 16.0;
static final double DEFAULT_WEIGHT_UNHEALTHY = 64.0;

private final int planSize;
private final double weightNonRack;
private final double weightNonReplica;
private final double weightStarting;
private final double weightUnhealthy;

public WeightedLoadBalancingPolicy(
@NonNull DriverContext context,
@NonNull String profileName) {
super(context, profileName);
this.planSize = profile.getInt(DefaultDriverOption.LOAD_BALANCING_SCORED_PLAN_SIZE, DEFAULT_SCORED_PLAN_SIZE);
// Choices of weights will change how this load balancer prefers endpoints.
// The weight is relative to the outstanding concurrency.
this.weightNonRack = profile.getDouble(
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_RACK, DEFAULT_WEIGHT_NON_RACK);
this.weightNonReplica = profile.getDouble(
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_REPLICA, DEFAULT_WEIGHT_NON_REPLICA);
this.weightStarting = profile.getDouble(
DefaultDriverOption.LOAD_BALANCING_WEIGHT_STARTING, DEFAULT_WEIGHT_STARTING);
this.weightUnhealthy = profile.getDouble(
DefaultDriverOption.LOAD_BALANCING_WEIGHT_UNHEALTHY, DEFAULT_WEIGHT_UNHEALTHY);
}

@NonNull
@Override
public Queue<Node> newQueryPlan(Request request, Session session) {
if (session == null) {
return super.newQueryPlan(request, null);
}

// Take a copy of nodes and reference to replicas since the node map is concurrent
Set<Node> dcNodeSet = getLiveNodes().dc(getLocalDatacenter());
Set<Node> replicaSet = getReplicas(request, session);

long nowNanos = nanoTime();
long nowMillis = milliTime();

// collect known replica nodes
List<NodeScore> nodeScores = new ArrayList<>(this.planSize);
for (Node replicaNode : replicaSet) {
if (dcNodeSet.contains(replicaNode)) {
nodeScores.add(new NodeScore(replicaNode,
getWeightedScore(replicaNode, session, nowMillis, nowNanos, true)));

if (nodeScores.size() == this.planSize) {
break; // TODO (akhaku) add the rest of the nodes once we port the tests to OSS
}
}
}

// collect any non-replicas, if we need to and there are some available
if (nodeScores.size() < this.planSize && nodeScores.size() < dcNodeSet.size()) {
Random rand = getRandom();
final Node[] dcNodes = dcNodeSet.toArray(new Node[0]);

for (int i = 0; i < dcNodes.length; i++) {
// pick a random target; shuffle it up front, so we don't revisit
int nextIndex = i + rand.nextInt(dcNodes.length - i);
ArrayUtils.swap(dcNodes, i, nextIndex);
Node dcNode = dcNodes[i];

// skip replicas; they were already inserted
// otherwise, found a valid node: score it
if (!replicaSet.contains(dcNode)) {
nodeScores.add(new NodeScore(dcNode,
getWeightedScore(dcNode, session, nowMillis, nowNanos, false)));

// if we scored, we might by now have already scored enough of what we need
if (nodeScores.size() == this.planSize || nodeScores.size() == dcNodes.length) {
break;
}
}
}

// by now we've scored everything we need to meet planSize, or if not, at least everything available
}

// At this point we have a small, typically 8 element array containing all local
// datacenter replicas and potentially some random choices from the rest of the datacenter.
//
// We now rank nodes by a score function that takes into account outstanding requests weighted
// by replica status, rack placement, uptime, and health status. In general, we expect to
// see the following order
// 1. Rack replicas
// 2. Datacenter replicas
// 3. Rack nodes
// 4. Datacenter nodes
// We expect these orderings to move around when nodes are overloaded. For example if the
// local zone replica has too much load we will failover to other replicas. If those
// replicas are too slow we will failover to other rack nodes.

// sort, extract, convert to query plan
nodeScores.sort(Comparator.comparingDouble(NodeScore::getScore));
Node[] candidate = new Node[nodeScores.size()];
for (int i = 0; i < candidate.length; i++) {
candidate[i] = nodeScores.get(i).getNode();
}

QueryPlan queryPlan = candidate.length == 0 ? QueryPlan.EMPTY : new SimpleQueryPlan((Object[]) candidate);
return maybeAddDcFailover(request, queryPlan);
}

protected String getLocalRack() {
return ""; // TODO (akhaku) internally we passed it through the context, for OSS perhaps something like the local DC helper?
}

protected boolean inRack(Node node) {
if (node == null || node.getRack() == null) return false;
return node.getRack().equals(this.getLocalRack());
}

protected double getWeightedScore(Node node, Session session, long nowMillis, long nowNanos, boolean isReplica) {
int base = Math.min(32768, 1 + getInFlight(node, session));
double weight = 1.0;

if (!inRack(node)) weight *= this.weightNonRack; // 4.0
if (!isReplica) weight *= this.weightNonReplica; // 12.0
if (isUnhealthy(node, session, nowNanos)) weight *= this.weightUnhealthy; // 64.0

// We want to gradually ramp up traffic, shedding heavily at first and then allowing it back
// in gradually. Note:
//
// 1. We cannot use nowNanos for this since node.getUpSinceMillis uses
// System.currentTimeMillis (which may have no relation to nano time).
// 2. getUpSinceMillis might return 0 or -1, in either case don't consider it freshly up.
// 3. When a client starts up everything will appear to be freshly up, which is fine since
// all nodes will randomly be shuffled to the front and back.
long millisSinceUp = nowMillis - node.getUpSinceMillis();
if (millisSinceUp < (DELAY_TRAFFIC_MILLIS + DELAY_TRAFFIC_SKEW_MILLIS)) {
double pShed = 1.0 - ((double) millisSinceUp / (DELAY_TRAFFIC_MILLIS + DELAY_TRAFFIC_SKEW_MILLIS));
if (pShed > getRandom().nextDouble()) {
if (LOG.isTraceEnabled()) {
String shed = String.format("%.2f", pShed);
LOG.trace("[{}] shedding at startup [pShed={}, millisSinceUp={}]", node, shed, millisSinceUp);
}
weight *= this.weightStarting; // 16.0
}
}

double score = base * weight;
if (LOG.isDebugEnabled()) {
String msg = String.format("score=%.2f [base=%d, weight=%.2f]", score, base, weight);
LOG.debug("[{}] {}", node, msg);
}
return score;
}

protected long milliTime() {
return System.currentTimeMillis();
}

protected Random getRandom() {
return ThreadLocalRandom.current();
}

/**
* Wraps a Node alongside its score.
*
* Calculating scores is expensive, and not stable (could vary). By wrapping them we can be sure the score
* is calculated only once and does not change during processing.
*/
static class NodeScore {
final double score;
final Node node;

public NodeScore(Node node, double score) {
this.node = node;
this.score = score;
}

public Node getNode() {
return node;
}

public double getScore() {
return score;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NodeScore nodeScore = (NodeScore) o;
return Double.compare(score, nodeScore.score) == 0 && Objects.equals(node, nodeScore.node);
}

@Override
public int hashCode() {
return Objects.hash(score, node);
}
}
}
44 changes: 44 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,50 @@ datastax-java-driver {
# Overridable in a profile: yes
allow-for-local-consistency-levels = false
}

# These configuration options apply when using the WeightedLoadBalancingPolicy.
# That policy calculates scores for 8 nodes (unless you modify scored-plan-size), multiplies
# them by the number of in-flight requests, then sorts the nodes by weight. The default weights
# will prefer in-rack replicas, followed by non-rack replicas, then rack non-replicas, followed
# by nodes that are not "unhealthy", followed by "unhealthy", and then the rest of the nodes.
# There is also an aversion to nodes that have recently started up, based on the node's
# advertised "millis since up". Changing the weights can change the order of preference.
weighted {
# How many items in the plan to score.
#
# Required: no
# Modifiable at runtime: no
# Overridable in a profile: yes
// scored-plan-size: 8

# Weight to apply when load balancing for a non-rack node.
#
# Required: no
# Modifiable at runtime: no
# Overridable in a profile: yes
// non-rack: 4.0

# Weight to apply when load balancing for a non-replica node.
#
# Required: no
# Modifiable at runtime: no
# Overridable in a profile: yes
// non-replica: 8.0

# Weight to apply when load balancing for a node that is still starting up.
#
# Required: no
# Modifiable at runtime: no
# Overridable in a profile: yes
// starting: 16.0

# Weight to apply when load balancing for an unhealthy node.
#
# Required: no
# Modifiable at runtime: no
# Overridable in a profile: yes
// unhealthy: 64.0
}
}

# Whether to schedule reconnection attempts if all contact points are unreachable on the first
Expand Down