Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move allocation awareness attributes to list setting #30626

Merged
merged 1 commit into from
May 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -544,20 +544,20 @@ public Set<String> getAllAllocationIds() {

static class AttributesKey {

final String[] attributes;
final List<String> attributes;

AttributesKey(String[] attributes) {
AttributesKey(List<String> attributes) {
this.attributes = attributes;
}

@Override
public int hashCode() {
return Arrays.hashCode(attributes);
return attributes.hashCode();
}

@Override
public boolean equals(Object obj) {
return obj instanceof AttributesKey && Arrays.equals(attributes, ((AttributesKey) obj).attributes);
return obj instanceof AttributesKey && attributes.equals(((AttributesKey) obj).attributes);
}
}

Expand Down Expand Up @@ -621,11 +621,11 @@ private static List<ShardRouting> collectAttributeShards(AttributesKey key, Disc
return Collections.unmodifiableList(to);
}

public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes) {
public ShardIterator preferAttributesActiveInitializingShardsIt(List<String> attributes, DiscoveryNodes nodes) {
return preferAttributesActiveInitializingShardsIt(attributes, nodes, shuffler.nextSeed());
}

public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes, int seed) {
public ShardIterator preferAttributesActiveInitializingShardsIt(List<String> attributes, DiscoveryNodes nodes, int seed) {
AttributesKey key = new AttributesKey(attributes);
AttributesRoutings activeRoutings = getActiveAttribute(key, nodes);
AttributesRoutings initializingRoutings = getInitializingAttribute(key, nodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -49,7 +50,7 @@ public class OperationRouting extends AbstractComponent {
Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", true,
Setting.Property.Dynamic, Setting.Property.NodeScope);

private String[] awarenessAttributes;
private List<String> awarenessAttributes;
private boolean useAdaptiveReplicaSelection;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
Expand All @@ -65,7 +66,7 @@ void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection;
}

private void setAwarenessAttributes(String[] awarenessAttributes) {
private void setAwarenessAttributes(List<String> awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

Expand Down Expand Up @@ -139,7 +140,7 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts) {
if (preference == null || preference.isEmpty()) {
if (awarenessAttributes.length == 0) {
if (awarenessAttributes.isEmpty()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {
Expand Down Expand Up @@ -174,7 +175,7 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index
}
// no more preference
if (index == -1 || index == preference.length() - 1) {
if (awarenessAttributes.length == 0) {
if (awarenessAttributes.isEmpty()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {
Expand Down Expand Up @@ -218,7 +219,7 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index
// shard ID into the hash of the user-supplied preference key.
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
}
if (awarenessAttributes.length == 0) {
if (awarenessAttributes.isEmpty()) {
return indexShard.activeInitializingShardsIt(routingHash);
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import com.carrotsearch.hppc.ObjectIntHashMap;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -34,6 +35,8 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

import static java.util.Collections.emptyList;

/**
* This {@link AllocationDecider} controls shard allocation based on
* {@code awareness} key-value pairs defined in the node configuration.
Expand Down Expand Up @@ -78,13 +81,13 @@ public class AwarenessAllocationDecider extends AllocationDecider {

public static final String NAME = "awareness";

public static final Setting<String[]> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING =
new Setting<>("cluster.routing.allocation.awareness.attributes", "", s -> Strings.tokenizeToStringArray(s, ","), Property.Dynamic,
public static final Setting<List<String>> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING =
Setting.listSetting("cluster.routing.allocation.awareness.attributes", emptyList(), Function.identity(), Property.Dynamic,
Property.NodeScope);
public static final Setting<Settings> CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING =
Setting.groupSetting("cluster.routing.allocation.awareness.force.", Property.Dynamic, Property.NodeScope);

private volatile String[] awarenessAttributes;
private volatile List<String> awarenessAttributes;

private volatile Map<String, List<String>> forcedAwarenessAttributes;

Expand All @@ -109,7 +112,7 @@ private void setForcedAwarenessAttributes(Settings forceSettings) {
this.forcedAwarenessAttributes = forcedAwarenessAttributes;
}

private void setAwarenessAttributes(String[] awarenessAttributes) {
private void setAwarenessAttributes(List<String> awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

Expand All @@ -124,7 +127,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
}

private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.length == 0) {
if (awarenessAttributes.isEmpty()) {
return allocation.decision(Decision.YES, NAME,
"allocation awareness is not enabled, set cluster setting [%s] to enable it",
CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey());
Expand All @@ -138,7 +141,7 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
return allocation.decision(Decision.NO, NAME,
"node does not contain the awareness attribute [%s]; required attributes cluster setting [%s=%s]",
awarenessAttribute, CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(),
allocation.debugDecision() ? Strings.arrayToCommaDelimitedString(awarenessAttributes) : null);
allocation.debugDecision() ? Strings.collectionToCommaDelimitedString(awarenessAttributes) : null);
}

// build attr_value -> nodes map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class IndexShardRoutingTableTests extends ESTestCase {
public void testEqualsAttributesKey() {
String[] attr1 = {"a"};
String[] attr2 = {"b"};
List<String> attr1 = Arrays.asList("a");
List<String> attr2 = Arrays.asList("b");
IndexShardRoutingTable.AttributesKey attributesKey1 = new IndexShardRoutingTable.AttributesKey(attr1);
IndexShardRoutingTable.AttributesKey attributesKey2 = new IndexShardRoutingTable.AttributesKey(attr1);
IndexShardRoutingTable.AttributesKey attributesKey3 = new IndexShardRoutingTable.AttributesKey(attr2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -50,7 +51,6 @@
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -224,11 +224,16 @@ public void testRandomRouting() {
}

public void testAttributePreferenceRouting() {
AllocationService strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id,zone")
.build());
Settings.Builder settings = Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always");
if (randomBoolean()) {
settings.put("cluster.routing.allocation.awareness.attributes", " rack_id, zone ");
} else {
settings.putList("cluster.routing.allocation.awareness.attributes", "rack_id", "zone");
}

AllocationService strategy = createAllocationService(settings.build());

MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
Expand Down Expand Up @@ -258,15 +263,15 @@ public void testAttributePreferenceRouting() {
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));

// after all are started, check routing iteration
ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes());
ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes());
ShardRouting shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node2"));

shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes());
shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes());
shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
Expand Down