Skip to content

Commit

Permalink
xds: support ring_hash as the endpoint-level LB policy (#7991)
Browse files Browse the repository at this point in the history
Update LB policy config generation to support ring hash policy as the endpoint-level LB policy.

- Changed the CDS LB policy to accept RING_HASH as the endpoint LB policy from CDS updates. This configuration is directly passed to its child policy (aka, ClusterResolverLoadBalancer) in its config.

- Changed ClusterResolverLoadBalancer to generate different LB configs for its downstream LB policies, depending on the endpoint-level LB policies.
  - If the endpoint-level LB policy is ROUND_ROBIN, the downstream LB policy hierarchy is: PriorityLB -> ClusterImplLB -> WeightedTargetLB -> RoundRobinLB
  - If the endpoin-level LB policy is RNIG_HASH, the downstream LB policy hierarchy is: PriorityLB -> ClusterImplLB -> RingHashLB.
  • Loading branch information
voidzcy authored Apr 16, 2021
1 parent 31cfb6d commit b4fe07d
Show file tree
Hide file tree
Showing 8 changed files with 420 additions and 256 deletions.
19 changes: 11 additions & 8 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -181,15 +183,16 @@ private void handleClusterDiscovered() {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
return;
}
String endpointPickingPolicy = root.result.lbPolicy();
LoadBalancerProvider localityPickingLbProvider =
lbRegistry.getProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded
LoadBalancerProvider endpointPickingLbProvider =
lbRegistry.getProvider(endpointPickingPolicy);
LoadBalancerProvider lbProvider = null;
Object lbConfig = null;
if (root.result.lbPolicy() == LbPolicy.RING_HASH) {
lbProvider = lbRegistry.getProvider("ring_hash");
lbConfig = new RingHashConfig(root.result.minRingSize(), root.result.maxRingSize());
} else {
lbProvider = lbRegistry.getProvider("round_robin");
}
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.unmodifiableList(instances),
new PolicySelection(localityPickingLbProvider, null /* by cluster_resolver LB policy */),
new PolicySelection(endpointPickingLbProvider, null));
Collections.unmodifiableList(instances), new PolicySelection(lbProvider, lbConfig));
if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
}
Expand Down
9 changes: 3 additions & 6 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.github.udpa.udpa.type.v1.TypedStruct;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -832,19 +831,17 @@ private static CdsUpdate processCluster(Cluster cluster, Set<String> retainedEds
}

CdsUpdate.Builder updateBuilder = structOrError.getStruct();
String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to(
CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name());

if (cluster.getLbPolicy() == LbPolicy.RING_HASH) {
RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH) {
throw new ResourceInvalidException(
"Unsupported ring hash function: " + lbConfig.getHashFunction());
}
updateBuilder.lbPolicy(lbPolicy, lbConfig.getMinimumRingSize().getValue(),
lbConfig.getMaximumRingSize().getValue());
updateBuilder.lbPolicy(CdsUpdate.LbPolicy.RING_HASH,
lbConfig.getMinimumRingSize().getValue(), lbConfig.getMaximumRingSize().getValue());
} else if (cluster.getLbPolicy() == LbPolicy.ROUND_ROBIN) {
updateBuilder.lbPolicy(lbPolicy);
updateBuilder.lbPolicy(CdsUpdate.LbPolicy.ROUND_ROBIN);
} else {
throw new ResourceInvalidException("Unsupported lb policy: " + cluster.getLbPolicy());
}
Expand Down
102 changes: 58 additions & 44 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -76,7 +77,8 @@
* used in the downstream LB policies for fine-grained load balancing purposes.
*/
final class ClusterResolverLoadBalancer extends LoadBalancer {

// DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
// to an empty locality.
private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
private final XdsLogger logger;
private final String authority;
Expand Down Expand Up @@ -156,12 +158,7 @@ private final class ClusterResolverLbState extends LoadBalancer {
private final Helper helper;
private final List<String> clusters = new ArrayList<>();
private final Map<String, ClusterState> clusterStates = new HashMap<>();
// An aggregate cluster is thought of as a cluster that groups the endpoints of the underlying
// clusters together for load balancing purposes only. Load balancing policies (both locality
// level and endpoint level) are configured by the aggregate cluster and apply to all of its
// underlying clusters.
private PolicySelection localityPickingPolicy;
private PolicySelection endpointPickingPolicy;
private PolicySelection endpointLbPolicy;
private ResolvedAddresses resolvedAddresses;
private LoadBalancer childLb;

Expand All @@ -175,20 +172,18 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
this.resolvedAddresses = resolvedAddresses;
ClusterResolverConfig config =
(ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
localityPickingPolicy = config.localityPickingPolicy;
endpointPickingPolicy = config.endpointPickingPolicy;
endpointLbPolicy = config.lbPolicy;
for (DiscoveryMechanism instance : config.discoveryMechanisms) {
clusters.add(instance.cluster);
ClusterState state;
if (instance.type == DiscoveryMechanism.Type.EDS) {
state = new EdsClusterState(instance.cluster, instance.edsServiceName,
instance.lrsServerName, instance.maxConcurrentRequests, instance.tlsContext);
clusterStates.put(instance.cluster, state);
} else { // logical DNS
state = new LogicalDnsClusterState(instance.cluster, instance.lrsServerName,
instance.maxConcurrentRequests, instance.tlsContext);
clusterStates.put(instance.cluster, state);
}
clusterStates.put(instance.cluster, state);
state.start();
}
}
Expand Down Expand Up @@ -392,8 +387,11 @@ public void run() {
for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
if (endpoint.isHealthy()) {
discard = false;
long weight =
(long) localityLbInfo.localityWeight() * endpoint.loadBalancingWeight();
Attributes attr = endpoint.eag().getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, locality).build();
.set(InternalXdsAttributes.ATTR_LOCALITY, locality)
.set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight).build();
EquivalentAddressGroup eag =
new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
eag = AddressFilter.setPathFilter(
Expand All @@ -419,10 +417,10 @@ public void run() {
}
List<String> priorities = new ArrayList<>(prioritizedLocalityWeights.keySet());
Collections.sort(priorities);
Map<String, PriorityChildConfig> priorityChildConfigs = generatePriorityChildConfigs(
name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext,
localityPickingPolicy, endpointPickingPolicy, true, lbRegistry,
prioritizedLocalityWeights, dropOverloads);
Map<String, PriorityChildConfig> priorityChildConfigs =
generateEdsBasedPriorityChildConfigs(
name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext,
endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, dropOverloads);
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities);
Expand Down Expand Up @@ -532,22 +530,22 @@ public void run() {
return;
}
backoffPolicy = null; // reset backoff sequence if succeeded
// Arbitrary priority notation for all DNS-resolved endpoints.
String priorityName = priorityName(name, 0); // value doesn't matter
List<EquivalentAddressGroup> addresses = new ArrayList<>();
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
// No weight attribute is attached, all endpoint-level LB policy should be able
// to handle such it.
Attributes attr = eag.getAttributes().toBuilder().set(
InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY).build();
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
eag = AddressFilter.setPathFilter(
eag, Arrays.asList(priorityName, LOGICAL_DNS_CLUSTER_LOCALITY.toString()));
addresses.add(eag);
}
PolicySelection endpointPickingPolicy =
new PolicySelection(lbRegistry.getProvider("pick_first"), null);
PriorityChildConfig priorityChildConfig = generatePriorityChildConfig(
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext,
endpointPickingPolicy, false, lbRegistry,
Collections.<DropOverload>emptyList());
lbRegistry, Collections.<DropOverload>emptyList());
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
Expand Down Expand Up @@ -614,58 +612,74 @@ private static class ClusterResolutionResult {
}

/**
* Generates the config to be used in the priority LB policy for a single priority.
* Generates the config to be used in the priority LB policy for the single priority of
* logical DNS cluster.
*
* <p>priority LB -> cluster_impl LB -> pick_first
* <p>priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first
*/
private static PriorityChildConfig generatePriorityChildConfig(
private static PriorityChildConfig generateDnsBasedPriorityChildConfig(
String cluster, @Nullable String edsServiceName, @Nullable String lrsServerName,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
PolicySelection endpointPickingPolicy, boolean ignoreReresolution,
LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads) {
// Override endpoint-level LB policy with pick_first for logical DNS cluster.
PolicySelection endpointLbPolicy =
new PolicySelection(lbRegistry.getProvider("pick_first"), null);
ClusterImplConfig clusterImplConfig =
new ClusterImplConfig(cluster, edsServiceName, lrsServerName, maxConcurrentRequests,
dropOverloads, endpointPickingPolicy, tlsContext);
dropOverloads, endpointLbPolicy, tlsContext);
LoadBalancerProvider clusterImplLbProvider =
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
PolicySelection clusterImplPolicy =
new PolicySelection(clusterImplLbProvider, clusterImplConfig);
return new PriorityChildConfig(clusterImplPolicy, ignoreReresolution);
return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/);
}

/**
* Generates configs to be used in the priority LB policy for priorities in the cluster.
* Generates configs to be used in the priority LB policy for priorities in an EDS cluster.
*
* <p>priority LB -> cluster_impl LB (one per priority) -> weighted_target LB
* -> round_robin (one per locality))
* <p>priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB
* -> round_robin (one per locality)) / ring_hash
*/
private static Map<String, PriorityChildConfig> generatePriorityChildConfigs(
private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildConfigs(
String cluster, @Nullable String edsServiceName, @Nullable String lrsServerName,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
PolicySelection localityPickingPolicy, PolicySelection endpointPickingPolicy,
boolean ignoreReresolution, LoadBalancerRegistry lbRegistry,
PolicySelection endpointLbPolicy, LoadBalancerRegistry lbRegistry,
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights,
List<DropOverload> dropOverloads) {
Map<String, PriorityChildConfig> configs = new HashMap<>();
for (String priority : prioritizedLocalityWeights.keySet()) {
Map<Locality, Integer> localityWeights = prioritizedLocalityWeights.get(priority);
Map<String, WeightedPolicySelection> targets = new HashMap<>();
for (Locality locality : localityWeights.keySet()) {
int weight = localityWeights.get(locality);
targets.put(localityName(locality),
new WeightedPolicySelection(weight, endpointPickingPolicy));
PolicySelection leafPolicy = endpointLbPolicy;
// Depending on the endpoint-level load balancing policy, different LB hierarchy may be
// created. If the endpoint-level LB policy is round_robin, it creates a two-level LB
// hierarchy: a locality-level LB policy that balances load according to locality weights
// followed by an endpoint-level LB policy that simply rounds robin the endpoints within
// the locality. If the endpoint-level LB policy is ring_hash, it creates a unified LB
// policy that balances load by weighing the product of each endpoint's weight and the
// weight of the locality it belongs to.
if (endpointLbPolicy.getProvider().getPolicyName().equals("round_robin")) {
Map<Locality, Integer> localityWeights = prioritizedLocalityWeights.get(priority);
Map<String, WeightedPolicySelection> targets = new HashMap<>();
for (Locality locality : localityWeights.keySet()) {
int weight = localityWeights.get(locality);
WeightedPolicySelection target = new WeightedPolicySelection(weight, endpointLbPolicy);
targets.put(localityName(locality), target);
}
LoadBalancerProvider weightedTargetLbProvider =
lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME);
WeightedTargetConfig weightedTargetConfig =
new WeightedTargetConfig(Collections.unmodifiableMap(targets));
leafPolicy = new PolicySelection(weightedTargetLbProvider, weightedTargetConfig);
}
PolicySelection localityPicking = new PolicySelection(
localityPickingPolicy.getProvider(),
new WeightedTargetConfig(Collections.unmodifiableMap(targets)));
ClusterImplConfig clusterImplConfig =
new ClusterImplConfig(cluster, edsServiceName, lrsServerName, maxConcurrentRequests,
dropOverloads, localityPicking, tlsContext);
dropOverloads, leafPolicy, tlsContext);
LoadBalancerProvider clusterImplLbProvider =
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
PolicySelection clusterImplPolicy =
new PolicySelection(clusterImplLbProvider, clusterImplConfig);
configs.put(priority, new PriorityChildConfig(clusterImplPolicy, ignoreReresolution));
PriorityChildConfig priorityChildConfig =
new PriorityChildConfig(clusterImplPolicy, true /* ignoreReresolution */);
configs.put(priority, priorityChildConfig);
}
return configs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,17 @@ public LoadBalancer newLoadBalancer(Helper helper) {
static final class ClusterResolverConfig {
// Ordered list of clusters to be resolved.
final List<DiscoveryMechanism> discoveryMechanisms;
final PolicySelection localityPickingPolicy;
final PolicySelection endpointPickingPolicy;
// Endpoint-level load balancing policy with config (round_robin or ring_hash).
final PolicySelection lbPolicy;

ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms,
PolicySelection localityPickingPolicy, PolicySelection endpointPickingPolicy) {
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, PolicySelection lbPolicy) {
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
this.localityPickingPolicy = checkNotNull(localityPickingPolicy, "localityPickingPolicy");
this.endpointPickingPolicy = checkNotNull(endpointPickingPolicy, "endpointPickingPolicy");
this.lbPolicy = checkNotNull(lbPolicy, "lbPolicy");
}

@Override
public int hashCode() {
return Objects.hash(discoveryMechanisms, localityPickingPolicy, endpointPickingPolicy);
return Objects.hash(discoveryMechanisms, lbPolicy);
}

@Override
Expand All @@ -92,16 +90,14 @@ public boolean equals(Object o) {
}
ClusterResolverConfig that = (ClusterResolverConfig) o;
return discoveryMechanisms.equals(that.discoveryMechanisms)
&& localityPickingPolicy.equals(that.localityPickingPolicy)
&& endpointPickingPolicy.equals(that.endpointPickingPolicy);
&& lbPolicy.equals(that.lbPolicy);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("discoveryMechanisms", discoveryMechanisms)
.add("localityPickingPolicy", localityPickingPolicy)
.add("endpointPickingPolicy", endpointPickingPolicy)
.add("lbPolicy", lbPolicy)
.toString();
}

Expand Down
Loading

0 comments on commit b4fe07d

Please sign in to comment.