Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit retries of failed allocations per index #18467

Merged
merged 12 commits into from
May 20, 2016
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class ClusterModule extends AbstractModule {
new Setting<>("cluster.routing.allocation.type", BALANCED_ALLOCATOR, Function.identity(), Property.NodeScope);
public static final List<Class<? extends AllocationDecider>> DEFAULT_ALLOCATION_DECIDERS =
Collections.unmodifiableList(Arrays.asList(
MaxRetryAllocationDecider.class,
SameShardAllocationDecider.class,
FilterAllocationDecider.class,
ReplicaAfterPrimaryActiveAllocationDecider.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public final class UnassignedInfo implements ToXContent, Writeable {
public static final Setting<TimeValue> INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING =
Setting.timeSetting("index.unassigned.node_left.delayed_timeout", DEFAULT_DELAYED_NODE_LEFT_TIMEOUT, Property.Dynamic,
Property.IndexScope);

/**
* Reason why the shard is in unassigned state.
* <p>
Expand Down Expand Up @@ -103,7 +102,11 @@ public enum Reason {
/**
* A better replica location is identified and causes the existing replica allocation to be cancelled.
*/
REALLOCATED_REPLICA;
REALLOCATED_REPLICA,
/**
* Unassigned as a result of a failed primary while the replica was initializing.
*/
PRIMARY_FAILED;
}

private final Reason reason;
Expand All @@ -112,6 +115,7 @@ public enum Reason {
private final long lastComputedLeftDelayNanos; // how long to delay shard allocation, not serialized (always positive, 0 means no delay)
private final String message;
private final Throwable failure;
private final int failedAllocations;

/**
* creates an UnassingedInfo object based **current** time
Expand All @@ -120,7 +124,7 @@ public enum Reason {
* @param message more information about cause.
**/
public UnassignedInfo(Reason reason, String message) {
this(reason, message, null, System.nanoTime(), System.currentTimeMillis());
this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis());
}

/**
Expand All @@ -130,13 +134,16 @@ public UnassignedInfo(Reason reason, String message) {
* @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
*/
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, long unassignedTimeNanos, long unassignedTimeMillis) {
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, int failedAllocations, long unassignedTimeNanos, long unassignedTimeMillis) {
this.reason = reason;
this.unassignedTimeMillis = unassignedTimeMillis;
this.unassignedTimeNanos = unassignedTimeNanos;
this.lastComputedLeftDelayNanos = 0L;
this.message = message;
this.failure = failure;
this.failedAllocations = failedAllocations;
assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED):
"failedAllocations: " + failedAllocations + " for reason " + reason;
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
}

Expand All @@ -147,17 +154,19 @@ public UnassignedInfo(UnassignedInfo unassignedInfo, long newComputedLeftDelayNa
this.lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
this.message = unassignedInfo.message;
this.failure = unassignedInfo.failure;
this.failedAllocations = unassignedInfo.failedAllocations;
}

public UnassignedInfo(StreamInput in) throws IOException {
this.reason = Reason.values()[(int) in.readByte()];
this.unassignedTimeMillis = in.readLong();
// As System.nanoTime() cannot be compared across different JVMs, reset it to now.
// This means that in master failover situations, elapsed delay time is forgotten.
// This means that in master fail-over situations, elapsed delay time is forgotten.
this.unassignedTimeNanos = System.nanoTime();
this.lastComputedLeftDelayNanos = 0L;
this.message = in.readOptionalString();
this.failure = in.readThrowable();
this.failedAllocations = in.readVInt();
}

public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -166,12 +175,18 @@ public void writeTo(StreamOutput out) throws IOException {
// Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs
out.writeOptionalString(message);
out.writeThrowable(failure);
out.writeVInt(failedAllocations);
}

public UnassignedInfo readFrom(StreamInput in) throws IOException {
return new UnassignedInfo(in);
}

/**
* Returns the number of previously failed allocations of this shard.
*/
public int getNumFailedAllocations() { return failedAllocations; }

/**
* The reason why the shard is unassigned.
*/
Expand Down Expand Up @@ -325,7 +340,11 @@ public String shortSummary() {
StringBuilder sb = new StringBuilder();
sb.append("[reason=").append(reason).append("]");
sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis)).append("]");
if (failedAllocations > 0) {
sb.append(", failed_attempts[").append(failedAllocations).append("]");
}
String details = getDetails();

if (details != null) {
sb.append(", details[").append(details).append("]");
}
Expand All @@ -342,6 +361,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject("unassigned_info");
builder.field("reason", reason);
builder.field("at", DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis));
if (failedAllocations > 0) {
builder.field("failed_attempts", failedAllocations);
}
String details = getDetails();
if (details != null) {
builder.field("details", details);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,10 @@ public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, Lis
List<FailedRerouteAllocation.FailedShard> orderedFailedShards = new ArrayList<>(failedShards);
orderedFailedShards.sort(Comparator.comparing(failedShard -> failedShard.shard.primary()));
for (FailedRerouteAllocation.FailedShard failedShard : orderedFailedShards) {
UnassignedInfo unassignedInfo = failedShard.shard.unassignedInfo();
final int failedAllocations = unassignedInfo != null ? unassignedInfo.getNumFailedAllocations() : 0;
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure,
System.nanoTime(), System.currentTimeMillis()));
failedAllocations + 1, System.nanoTime(), System.currentTimeMillis()));
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
Expand Down Expand Up @@ -437,7 +439,7 @@ private boolean deassociateDeadNodes(RoutingAllocation allocation) {
// now, go over all the shards routing on the node, and fail them
for (ShardRouting shardRouting : node.copyShards()) {
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null,
allocation.getCurrentNanoTime(), System.currentTimeMillis());
0, allocation.getCurrentNanoTime(), System.currentTimeMillis());
applyFailedShard(allocation, shardRouting, false, unassignedInfo);
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
Expand All @@ -457,8 +459,8 @@ private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, S
boolean changed = false;
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}
return changed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
// we need to move the unassigned info back to treat it as if it was index creation
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + ", " + shardRouting.unassignedInfo().getMessage(),
shardRouting.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis());
shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis());
}

initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;

/**
* An allocation decider that prevents shards from being allocated on any node if the shards allocation has been retried N times without
* success. This means if a shard has been INITIALIZING N times in a row without being moved to STARTED the shard will be ignored until
* the setting for <tt>index.allocation.max_retry</tt> is raised. The default value is <tt>5</tt>.
*/
public class MaxRetryAllocationDecider extends AllocationDecider {

public static final Setting<Integer> SETTING_ALLOCATION_MAX_RETRY = Setting.intSetting("index.allocation.max_retries", 5, 0,
Setting.Property.Dynamic, Setting.Property.IndexScope);

public static final String NAME = "max_retry";

/**
* Initializes a new {@link MaxRetryAllocationDecider}
*
* @param settings {@link Settings} used by this {@link AllocationDecider}
*/
@Inject
public MaxRetryAllocationDecider(Settings settings) {
super(settings);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
if (unassignedInfo != null && unassignedInfo.getNumFailedAllocations() > 0) {
IndexMetaData indexSafe = allocation.metaData().getIndexSafe(shardRouting.index());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just call this variable indexMetaData?

int maxRetry = SETTING_ALLOCATION_MAX_RETRY.get(indexSafe.getSettings());
if (unassignedInfo.getNumFailedAllocations() >= maxRetry) {
return allocation.decision(Decision.NO, NAME, "shard has already failed allocating ["
+ unassignedInfo.getNumFailedAllocations() + "] times " + unassignedInfo.toString());
}
}
return allocation.decision(Decision.YES, NAME, "shard has no previous failures");
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return canAllocate(shardRouting, allocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.gateway.PrimaryShardAllocator;
Expand All @@ -40,7 +41,6 @@
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.IndexWarmer;
import org.elasticsearch.indices.IndicesRequestCache;

import java.util.Arrays;
Expand All @@ -59,6 +59,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
public static final Predicate<String> INDEX_SETTINGS_KEY_PREDICATE = (s) -> s.startsWith(IndexMetaData.INDEX_SETTING_PREFIX);

public static final Set<Setting<?>> BUILT_IN_INDEX_SETTINGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY,
IndexSettings.INDEX_TTL_DISABLE_PURGE_SETTING,
IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING,
IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public boolean processExistingRecoveries(RoutingAllocation allocation) {
currentNode, nodeWithHighestMatch);
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
changed = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,19 @@ public void testReasonOrdinalOrder() {
UnassignedInfo.Reason.NODE_LEFT,
UnassignedInfo.Reason.REROUTE_CANCELLED,
UnassignedInfo.Reason.REINITIALIZED,
UnassignedInfo.Reason.REALLOCATED_REPLICA};
UnassignedInfo.Reason.REALLOCATED_REPLICA,
UnassignedInfo.Reason.PRIMARY_FAILED};
for (int i = 0; i < order.length; i++) {
assertThat(order[i].ordinal(), equalTo(i));
}
assertThat(UnassignedInfo.Reason.values().length, equalTo(order.length));
}

public void testSerialization() throws Exception {
UnassignedInfo meta = new UnassignedInfo(RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()), randomBoolean() ? randomAsciiOfLength(4) : null);
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values());
UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis()):
new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null);
BytesStreamOutput out = new BytesStreamOutput();
meta.writeTo(out);
out.close();
Expand All @@ -82,6 +86,7 @@ public void testSerialization() throws Exception {
assertThat(read.getUnassignedTimeInMillis(), equalTo(meta.getUnassignedTimeInMillis()));
assertThat(read.getMessage(), equalTo(meta.getMessage()));
assertThat(read.getDetails(), equalTo(meta.getDetails()));
assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations()));
}

public void testIndexCreated() {
Expand Down Expand Up @@ -273,7 +278,10 @@ public void testUnassignedDelayedOnlyOnNodeLeft() throws Exception {
public void testUnassignedDelayOnlyNodeLeftNonNodeLeftReason() throws Exception {
EnumSet<UnassignedInfo.Reason> reasons = EnumSet.allOf(UnassignedInfo.Reason.class);
reasons.remove(UnassignedInfo.Reason.NODE_LEFT);
UnassignedInfo unassignedInfo = new UnassignedInfo(RandomPicks.randomFrom(random(), reasons), null);
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), reasons);
UnassignedInfo unassignedInfo = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
new UnassignedInfo(reason, null, null, 1, System.nanoTime(), System.currentTimeMillis()):
new UnassignedInfo(reason, null);
unassignedInfo = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "10h").build(), Settings.EMPTY);
long delay = unassignedInfo.getLastComputedLeftDelayNanos();
Expand All @@ -287,7 +295,7 @@ public void testUnassignedDelayOnlyNodeLeftNonNodeLeftReason() throws Exception
*/
public void testLeftDelayCalculation() throws Exception {
final long baseTime = System.nanoTime();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, baseTime, System.currentTimeMillis());
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, System.currentTimeMillis());
final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos();
final Settings settings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build();
unassignedInfo = unassignedInfo.updateDelay(baseTime, settings, Settings.EMPTY);
Expand Down
Loading