Skip to content

Commit

Permalink
Auto-expand replicas when adding or removing nodes (#30423)
Browse files Browse the repository at this point in the history
Auto-expands replicas in the same cluster state update (instead of a follow-up reroute) where nodes are added or removed.

Closes #1873, fixing an issue where nodes drop their copy of auto-expanded data when coming up, only to sync it again later.
  • Loading branch information
ywelsch authored May 7, 2018
1 parent 44c6dcf commit 82b251a
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 119 deletions.
6 changes: 6 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ Rollup::
* Validate timezone in range queries to ensure they match the selected job when
searching ({pull}30338[#30338])


Allocation::

Auto-expand replicas when adding or removing nodes to prevent shard copies from
being dropped and resynced when a data node rejoins the cluster ({pull}30423[#30423])

//[float]
//=== Regressions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,36 @@
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* This class acts as a functional wrapper around the {@code index.auto_expand_replicas} setting.
* This setting or rather it's value is expanded into a min and max value which requires special handling
* based on the number of datanodes in the cluster. This class handles all the parsing and streamlines the access to these values.
*/
final class AutoExpandReplicas {
public final class AutoExpandReplicas {
// the value we recognize in the "max" position to mean all the nodes
private static final String ALL_NODES_VALUE = "all";
public static final Setting<AutoExpandReplicas> SETTING = new Setting<>(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "false", (value) -> {

private static final AutoExpandReplicas FALSE_INSTANCE = new AutoExpandReplicas(0, 0, false);

public static final Setting<AutoExpandReplicas> SETTING = new Setting<>(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "false",
AutoExpandReplicas::parse, Property.Dynamic, Property.IndexScope);

private static AutoExpandReplicas parse(String value) {
final int min;
final int max;
if (Booleans.isFalse(value)) {
return new AutoExpandReplicas(0, 0, false);
return FALSE_INSTANCE;
}
final int dash = value.indexOf('-');
if (-1 == dash) {
Expand All @@ -57,7 +70,7 @@ final class AutoExpandReplicas {
}
}
return new AutoExpandReplicas(min, max, true);
}, Property.Dynamic, Property.IndexScope);
}

private final int minReplicas;
private final int maxReplicas;
Expand All @@ -80,6 +93,24 @@ int getMaxReplicas(int numDataNodes) {
return Math.min(maxReplicas, numDataNodes-1);
}

Optional<Integer> getDesiredNumberOfReplicas(int numDataNodes) {
if (enabled) {
final int min = getMinReplicas();
final int max = getMaxReplicas(numDataNodes);
int numberOfReplicas = numDataNodes - 1;
if (numberOfReplicas < min) {
numberOfReplicas = min;
} else if (numberOfReplicas > max) {
numberOfReplicas = max;
}

if (numberOfReplicas >= min && numberOfReplicas <= max) {
return Optional.of(numberOfReplicas);
}
}
return Optional.empty();
}

@Override
public String toString() {
return enabled ? minReplicas + "-" + maxReplicas : "false";
Expand All @@ -88,6 +119,31 @@ public String toString() {
boolean isEnabled() {
return enabled;
}

/**
* Checks if the are replicas with the auto-expand feature that need to be adapted.
* Returns a map of updates, which maps the indices to be updated to the desired number of replicas.
* The map has the desired number of replicas as key and the indices to update as value, as this allows the result
* of this method to be directly applied to RoutingTable.Builder#updateNumberOfReplicas.
*/
public static Map<Integer, List<String>> getAutoExpandReplicaChanges(MetaData metaData, DiscoveryNodes discoveryNodes) {
// used for translating "all" to a number
final int dataNodeCount = discoveryNodes.getDataNodes().size();

Map<Integer, List<String>> nrReplicasChanged = new HashMap<>();

for (final IndexMetaData indexMetaData : metaData) {
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
AutoExpandReplicas autoExpandReplicas = SETTING.get(indexMetaData.getSettings());
autoExpandReplicas.getDesiredNumberOfReplicas(dataNodeCount).ifPresent(numberOfReplicas -> {
if (numberOfReplicas != indexMetaData.getNumberOfReplicas()) {
nrReplicasChanged.computeIfAbsent(numberOfReplicas, ArrayList::new).add(indexMetaData.getIndex().getName());
}
});
}
}
return nrReplicasChanged;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand All @@ -42,16 +40,12 @@
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
Expand All @@ -61,7 +55,7 @@
/**
* Service responsible for submitting update index settings requests
*/
public class MetaDataUpdateSettingsService extends AbstractComponent implements ClusterStateListener {
public class MetaDataUpdateSettingsService extends AbstractComponent {

private final ClusterService clusterService;

Expand All @@ -77,87 +71,11 @@ public MetaDataUpdateSettingsService(Settings settings, ClusterService clusterSe
super(settings);
this.clusterService = clusterService;
this.threadPool = threadPool;
this.clusterService.addListener(this);
this.allocationService = allocationService;
this.indexScopedSettings = indexScopedSettings;
this.indicesService = indicesService;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
// update an index with number of replicas based on data nodes if possible
if (!event.state().nodes().isLocalNodeElectedMaster()) {
return;
}
// we will want to know this for translating "all" to a number
final int dataNodeCount = event.state().nodes().getDataNodes().size();

Map<Integer, List<Index>> nrReplicasChanged = new HashMap<>();
// we need to do this each time in case it was changed by update settings
for (final IndexMetaData indexMetaData : event.state().metaData()) {
AutoExpandReplicas autoExpandReplicas = IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetaData.getSettings());
if (autoExpandReplicas.isEnabled()) {
/*
* we have to expand the number of replicas for this index to at least min and at most max nodes here
* so we are bumping it up if we have to or reduce it depending on min/max and the number of datanodes.
* If we change the number of replicas we just let the shard allocator do it's thing once we updated it
* since it goes through the index metadata to figure out if something needs to be done anyway. Do do that
* we issue a cluster settings update command below and kicks off a reroute.
*/
final int min = autoExpandReplicas.getMinReplicas();
final int max = autoExpandReplicas.getMaxReplicas(dataNodeCount);
int numberOfReplicas = dataNodeCount - 1;
if (numberOfReplicas < min) {
numberOfReplicas = min;
} else if (numberOfReplicas > max) {
numberOfReplicas = max;
}
// same value, nothing to do there
if (numberOfReplicas == indexMetaData.getNumberOfReplicas()) {
continue;
}

if (numberOfReplicas >= min && numberOfReplicas <= max) {

if (!nrReplicasChanged.containsKey(numberOfReplicas)) {
nrReplicasChanged.put(numberOfReplicas, new ArrayList<>());
}

nrReplicasChanged.get(numberOfReplicas).add(indexMetaData.getIndex());
}
}
}

if (nrReplicasChanged.size() > 0) {
// update settings and kick of a reroute (implicit) for them to take effect
for (final Integer fNumberOfReplicas : nrReplicasChanged.keySet()) {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build();
final List<Index> indices = nrReplicasChanged.get(fNumberOfReplicas);

UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest()
.indices(indices.toArray(new Index[indices.size()])).settings(settings)
.ackTimeout(TimeValue.timeValueMillis(0)) //no need to wait for ack here
.masterNodeTimeout(TimeValue.timeValueMinutes(10));

updateSettings(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
for (Index index : indices) {
logger.info("{} auto expanded replicas to [{}]", index, fNumberOfReplicas);
}
}

@Override
public void onFailure(Exception t) {
for (Index index : indices) {
logger.warn("{} fail to auto expand replicas to [{}]", index, fNumberOfReplicas);
}
}
});
}
}
}

public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
final Settings normalizedSettings = Settings.builder().put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build();
Settings.Builder settingsForClosedIndices = Settings.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
Expand All @@ -46,6 +47,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -206,11 +208,12 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis
* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
* if needed.
*/
public ClusterState deassociateDeadNodes(final ClusterState clusterState, boolean reroute, String reason) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());

// first, clear from the shards any node id they used to belong to that is now dead
Expand All @@ -220,12 +223,40 @@ public ClusterState deassociateDeadNodes(final ClusterState clusterState, boolea
reroute(allocation);
}

if (allocation.routingNodesChanged() == false) {
if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}

/**
* Checks if the are replicas with the auto-expand feature that need to be adapted.
* Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required.
*/
private ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
final Map<Integer, List<String>> autoExpandReplicaChanges =
AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes());
if (autoExpandReplicaChanges.isEmpty()) {
return clusterState;
} else {
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterState.routingTable());
final MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData());
for (Map.Entry<Integer, List<String>> entry : autoExpandReplicaChanges.entrySet()) {
final int numberOfReplicas = entry.getKey();
final String[] indices = entry.getValue().toArray(new String[entry.getValue().size()]);
// we do *not* update the in sync allocation ids as they will be removed upon the first index
// operation which make these copies stale
routingTableBuilder.updateNumberOfReplicas(numberOfReplicas, indices);
metaDataBuilder.updateNumberOfReplicas(numberOfReplicas, indices);
logger.info("updating number_of_replicas to [{}] for indices {}", numberOfReplicas, indices);
}
final ClusterState fixedState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build())
.metaData(metaDataBuilder).build();
assert AutoExpandReplicas.getAutoExpandReplicaChanges(fixedState.metaData(), fixedState.nodes()).isEmpty();
return fixedState;
}
}

/**
* Removes delay markers from unassigned shards based on current time stamp.
*/
Expand Down Expand Up @@ -301,6 +332,7 @@ public CommandsResult reroute(final ClusterState clusterState, AllocationCommand
if (retryFailed) {
resetFailedAllocationCounter(allocation);
}

reroute(allocation);
return new CommandsResult(explanations, buildResultAndLogHealthChange(clusterState, allocation, "reroute commands"));
}
Expand All @@ -320,15 +352,17 @@ public ClusterState reroute(ClusterState clusterState, String reason) {
* <p>
* If the same instance of ClusterState is returned, then no change has been made.
*/
protected ClusterState reroute(final ClusterState clusterState, String reason, boolean debug) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) {
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);

RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());
allocation.debugDecision(debug);
reroute(allocation);
if (allocation.routingNodesChanged() == false) {
if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
Expand All @@ -353,6 +387,8 @@ private boolean hasDeadNodes(RoutingAllocation allocation) {

private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
"auto-expand replicas out of sync with number of nodes in the cluster";

// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
Expand Down
Loading

0 comments on commit 82b251a

Please sign in to comment.