Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rack-aware load balancing policy #73

Merged
merged 5 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -2213,6 +2213,50 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster,
unsigned used_hosts_per_remote_dc,
cass_bool_t allow_remote_dcs_for_local_cl);

/**
* Configures the cluster to use Rack-aware load balancing.
* For each query, all live nodes in a primary 'local' rack are tried first,
* followed by nodes from local DC and then nodes from other DCs.
*
* With empty local_rack and local_dc, default local_dc and local_rack
* is chosen from the first connected contact point,
* and no remote hosts are considered in query plans.
* If relying on this mechanism, be sure to use only contact
* points from the local rack.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] local_dc The primary data center to try first
* @param[in] local_rack The primary rack to try first
* @return CASS_OK if successful, otherwise an error occurred
*/
CASS_EXPORT CassError
cass_cluster_set_load_balance_rack_aware(CassCluster* cluster,
const char* local_dc,
const char* local_rack);


/**
* Same as cass_cluster_set_load_balance_rack_aware(), but with lengths for string
* parameters.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] local_dc
* @param[in] local_dc_length
* @return same as cass_cluster_set_load_balance_dc_aware()
*
* @see cass_cluster_set_load_balance_dc_aware()
*/
CASS_EXPORT CassError
cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster,
const char* local_dc,
size_t local_dc_length,
const char* local_rack,
size_t local_rack_length);

/**
* Configures the cluster to use token-aware request routing or not.
*
Expand Down
3 changes: 3 additions & 0 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "constants.hpp"
#include "dc_aware_policy.hpp"
#include "rack_aware_policy.hpp"
#include "external.hpp"
#include "logger.hpp"
#include "resolver.hpp"
Expand Down Expand Up @@ -240,6 +241,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list
const ControlConnectionSchema& schema,
const LoadBalancingPolicy::Ptr& load_balancing_policy,
const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
const String& local_rack,
const StringMultimap& supported_options, const ClusterSettings& settings)
: connection_(connection)
, listener_(listener ? listener : &nop_cluster_listener__)
Expand All @@ -251,6 +253,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list
, connected_host_(connected_host)
, hosts_(hosts)
, local_dc_(local_dc)
, local_rack_(local_rack)
, supported_options_(supported_options)
, is_recording_events_(settings.disable_events_on_startup) {
static const auto optimized_msg = "===== Using optimized driver!!! =====\n";
Expand Down
4 changes: 4 additions & 0 deletions src/cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class Cluster
* determining the next control connection host.
* @param load_balancing_policies
* @param local_dc The local datacenter determined by the metadata service for initializing the
* @param local_rack The local rack determined by the metadata service for initializing the
* load balancing policies.
* @param supported_options Supported options discovered during control connection.
* @param settings The control connection settings to use for reconnecting the
Expand All @@ -267,6 +268,7 @@ class Cluster
const ControlConnectionSchema& schema,
const LoadBalancingPolicy::Ptr& load_balancing_policy,
const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
const String& local_rack,
const StringMultimap& supported_options, const ClusterSettings& settings);

/**
Expand Down Expand Up @@ -361,6 +363,7 @@ class Cluster
const Host::Ptr& connected_host() const { return connected_host_; }
const TokenMap::Ptr& token_map() const { return token_map_; }
const String& local_dc() const { return local_dc_; }
const String& local_rack() const { return local_rack_; }
const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); }
const StringMultimap& supported_options() const { return supported_options_; }
const ShardPortCalculator* shard_port_calculator() const { return shard_port_calculator_.get(); }
Expand Down Expand Up @@ -449,6 +452,7 @@ class Cluster
PreparedMetadata prepared_metadata_;
TokenMap::Ptr token_map_;
String local_dc_;
String local_rack_;
StringMultimap supported_options_;
Timer timer_;
bool is_recording_events_;
Expand Down
21 changes: 21 additions & 0 deletions src/cluster_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,27 @@ CassError cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, const c
return CASS_OK;
}

CassError cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, const char* local_dc,
const char* local_rack) {
if (local_dc == NULL || local_rack == NULL) {
return CASS_ERROR_LIB_BAD_PARAMS;
}
return cass_cluster_set_load_balance_rack_aware_n(cluster, local_dc, SAFE_STRLEN(local_dc),
local_rack, SAFE_STRLEN(local_rack));
}

CassError cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, const char* local_dc,
size_t local_dc_length,
const char* local_rack,
size_t local_rack_length) {
if (local_dc == NULL || local_dc_length == 0 || local_rack == NULL || local_rack_length == 0) {
return CASS_ERROR_LIB_BAD_PARAMS;
}
cluster->config().set_load_balancing_policy(new RackAwarePolicy(
String(local_dc, local_dc_length), String(local_rack, local_rack_length)));
return CASS_OK;
}

void cass_cluster_set_token_aware_routing(CassCluster* cluster, cass_bool_t enabled) {
cluster->config().set_token_aware_routing(enabled == cass_true);
}
Expand Down
11 changes: 9 additions & 2 deletions src/cluster_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "cluster_connector.hpp"
#include "dc_aware_policy.hpp"
#include "rack_aware_policy.hpp"
#include "protocol.hpp"
#include "random.hpp"
#include "round_robin_policy.hpp"
Expand Down Expand Up @@ -177,6 +178,7 @@ void ClusterConnector::on_resolve(ClusterMetadataResolver* resolver) {
}

local_dc_ = resolver->local_dc();
local_rack_ = resolver->local_rack();
remaining_connector_count_ = resolved_contact_points.size();
for (AddressVec::const_iterator it = resolved_contact_points.begin(),
end = resolved_contact_points.end();
Expand Down Expand Up @@ -231,7 +233,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end();
it != end; ++it) {
LoadBalancingPolicy::Ptr policy(*it);
policy->init(connected_host, hosts, random_, local_dc_);
policy->init(connected_host, hosts, random_, local_dc_, local_rack_);
policy->register_handles(event_loop_->loop());
}

Expand All @@ -248,6 +250,11 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
message = "No hosts available for the control connection using the "
"DC-aware load balancing policy. "
"Check to see if the configured local datacenter is valid";
} else if (dynamic_cast<RackAwarePolicy::RackAwareQueryPlan*>(query_plan.get()) !=
NULL) { // Check if Rack-aware
message = "No hosts available for the control connection using the "
"Rack-aware load balancing policy. "
"Check to see if the configured local datacenter and rack is valid";
} else {
message = "No hosts available for the control connection using the "
"configured load balancing policy";
Expand All @@ -258,7 +265,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) {

cluster_.reset(new Cluster(connector->release_connection(), listener_, event_loop_,
connected_host, hosts, connector->schema(), default_policy, policies,
local_dc_, connector->supported_options(), settings_));
local_dc_, local_rack_, connector->supported_options(), settings_));

// Clear any connection errors and set the final negotiated protocol version.
error_code_ = CLUSTER_OK;
Expand Down
1 change: 1 addition & 0 deletions src/cluster_connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class ClusterConnector : public RefCounted<ClusterConnector> {
Random* random_;
Metrics* metrics_;
String local_dc_;
String local_rack_;
ClusterSettings settings_;

Callback callback_;
Expand Down
2 changes: 2 additions & 0 deletions src/cluster_metadata_resolver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ClusterMetadataResolver : public RefCounted<ClusterMetadataResolver> {

const AddressVec& resolved_contact_points() const { return resolved_contact_points_; }
const String& local_dc() const { return local_dc_; }
const String& local_rack() const { return local_rack_; }

protected:
virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) = 0;
Expand All @@ -57,6 +58,7 @@ class ClusterMetadataResolver : public RefCounted<ClusterMetadataResolver> {
protected:
AddressVec resolved_contact_points_;
String local_dc_;
String local_rack_;
Callback callback_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/dc_aware_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ DCAwarePolicy::DCAwarePolicy(const String& local_dc, size_t used_hosts_per_remot
DCAwarePolicy::~DCAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); }

void DCAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
const String& local_dc) {
const String& local_dc, const String& local_rack) {
if (local_dc_.empty()) { // Only override if no local DC was specified.
local_dc_ = local_dc;
}
Expand Down
2 changes: 1 addition & 1 deletion src/dc_aware_policy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DCAwarePolicy : public LoadBalancingPolicy {
~DCAwarePolicy();

virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
const String& local_dc);
const String& local_dc, const String& local_rack);

virtual CassHostDistance distance(const Host::Ptr& host) const;

Expand Down
1 change: 1 addition & 0 deletions src/execution_profile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "cassandra.h"
#include "constants.hpp"
#include "dc_aware_policy.hpp"
#include "rack_aware_policy.hpp"
#include "dense_hash_map.hpp"
#include "latency_aware_policy.hpp"
#include "speculative_execution.hpp"
Expand Down
4 changes: 2 additions & 2 deletions src/latency_aware_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ using namespace datastax::internal;
using namespace datastax::internal::core;

void LatencyAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
const String& local_dc) {
const String& local_dc, const String& local_rack) {
hosts_->reserve(hosts.size());
std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost());
for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) {
i->second->enable_latency_tracking(settings_.scale_ns, settings_.min_measured);
}
ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc);
ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc, local_rack);
}

void LatencyAwarePolicy::register_handles(uv_loop_t* loop) { start_timer(loop); }
Expand Down
2 changes: 1 addition & 1 deletion src/latency_aware_policy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class LatencyAwarePolicy : public ChainedLoadBalancingPolicy {
virtual ~LatencyAwarePolicy() {}

virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
const String& local_dc);
const String& local_dc, const String& local_rack);

virtual void register_handles(uv_loop_t* loop);
virtual void close_handles();
Expand Down
4 changes: 2 additions & 2 deletions src/list_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ using namespace datastax::internal;
using namespace datastax::internal::core;

void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
const String& local_dc) {
const String& local_dc, const String& local_rack) {
HostMap valid_hosts;
for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) {
const Host::Ptr& host = i->second;
Expand All @@ -36,7 +36,7 @@ void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Ran
LOG_ERROR("No valid hosts available for list policy");
}

ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc);
ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc, local_rack);
}

CassHostDistance ListPolicy::distance(const Host::Ptr& host) const {
Expand Down
2 changes: 1 addition & 1 deletion src/list_policy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ListPolicy : public ChainedLoadBalancingPolicy {
virtual ~ListPolicy() {}

virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
const String& local_dc);
const String& local_dc, const String& local_rack);

virtual CassHostDistance distance(const Host::Ptr& host) const;

Expand Down
18 changes: 15 additions & 3 deletions src/load_balancing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,21 @@ typedef enum CassBalancingState_ {
CASS_BALANCING_NEW_QUERY_PLAN
} CassBalancingState;

// CassHostDistance specifies how far a host is from the client.
// The meaning of the distance depends on the load balancing policy.
// The policies should assign the distance starting from the lowest
// without skipping values, i.e. they should start with LOCAL.
// For example:
// - DCAwarePolicy uses LOCAL for same DC, REMOTE for different DC.
// - RackAwarePolicy uses LOCAL for same rack,
// REMOTE for different rack and same DC, and
// REMOTE2 for different DC.
// - RoundRobinPolicy has distinguishes only one distance level and
// always uses LOCAL for all nodes.
typedef enum CassHostDistance_ {
CASS_HOST_DISTANCE_LOCAL,
CASS_HOST_DISTANCE_REMOTE,
CASS_HOST_DISTANCE_REMOTE2,
CASS_HOST_DISTANCE_IGNORE
} CassHostDistance;

Expand Down Expand Up @@ -87,7 +99,7 @@ class LoadBalancingPolicy : public RefCounted<LoadBalancingPolicy> {
virtual ~LoadBalancingPolicy() {}

virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
const String& local_dc) = 0;
const String& local_dc, const String &local_rack) = 0;

virtual void register_handles(uv_loop_t* loop) {}
virtual void close_handles() {}
Expand Down Expand Up @@ -124,8 +136,8 @@ class ChainedLoadBalancingPolicy : public LoadBalancingPolicy {
virtual ~ChainedLoadBalancingPolicy() {}

virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
const String& local_dc) {
return child_policy_->init(connected_host, hosts, random, local_dc);
const String& local_dc, const String& local_rack) {
return child_policy_->init(connected_host, hosts, random, local_dc, local_rack);
}

virtual const LoadBalancingPolicy::Ptr& child_policy() const { return child_policy_; }
Expand Down
Loading