Skip to content

Commit

Permalink
Delay shard reassignment from nodes which are known to be restarting (#…
Browse files Browse the repository at this point in the history
…75606)

This PR makes the delayed allocation infrastructure aware of registered node shutdowns, so that reallocation of shards will be further delayed for nodes which are known to be restarting.

To make this more configurable, the Node Shutdown APIs now support a `allocation_delay` parameter, which defaults to 5 minutes. For example:
```
PUT /_nodes/USpTGYaBSIKbgSUJR2Z9lg/shutdown
{
  "type": "restart",
  "reason": "Demonstrating how the node shutdown API works",
  "allocation_delay": "20m"
}
```

Will cause reallocation of shards assigned to that node to another node to be delayed by 20 minutes. Note that this delay will only be used if it's *longer* than the index-level allocation delay, set via `index.unassigned.node_left.delayed_timeout`.

The `allocation_delay` parameter is only valid for `restart`-type shutdown registrations, and the request will be rejected if it's used with another shutdown type.
  • Loading branch information
gwbrown authored Aug 16, 2021
1 parent 189f650 commit 58f66cf
Show file tree
Hide file tree
Showing 27 changed files with 935 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
Expand Down Expand Up @@ -58,7 +59,7 @@
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.metadata.AliasMetadata.newAliasMetadataBuilder;
import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomChange;
import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomReason;
import static org.elasticsearch.cluster.routing.UnassignedInfoTests.randomUnassignedInfo;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
Expand Down Expand Up @@ -270,7 +271,7 @@ private IndexRoutingTable randomIndexRoutingTable(String index, String[] nodeIds
for (int j = 0; j < replicaCount; j++) {
UnassignedInfo unassignedInfo = null;
if (randomInt(5) == 1) {
unassignedInfo = new UnassignedInfo(randomReason(), randomAlphaOfLength(10));
unassignedInfo = randomUnassignedInfo(randomAlphaOfLength(10));
}
if (availableNodeIds.isEmpty()) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,12 @@ public Map<String, DataStreamAlias> dataStreamAliases() {
.orElse(Collections.emptyMap());
}

public Map<String, SingleNodeShutdownMetadata> nodeShutdowns() {
return Optional.ofNullable((NodesShutdownMetadata) this.custom(NodesShutdownMetadata.TYPE))
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
.orElse(Collections.emptyMap());
}

public ImmutableOpenMap<String, Custom> customs() {
return this.customs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.util.Locale;
Expand All @@ -35,14 +38,16 @@ public class SingleNodeShutdownMetadata extends AbstractDiffable<SingleNodeShutd
public static final ParseField REASON_FIELD = new ParseField("reason");
public static final String STARTED_AT_READABLE_FIELD = "shutdown_started";
public static final ParseField STARTED_AT_MILLIS_FIELD = new ParseField(STARTED_AT_READABLE_FIELD + "millis");
public static final ParseField ALLOCATION_DELAY_FIELD = new ParseField("allocation_delay");

public static final ConstructingObjectParser<SingleNodeShutdownMetadata, Void> PARSER = new ConstructingObjectParser<>(
"node_shutdown_info",
a -> new SingleNodeShutdownMetadata(
(String) a[0],
Type.valueOf((String) a[1]),
(String) a[2],
(long) a[3]
(long) a[3],
(TimeValue) a[4]
)
);

Expand All @@ -51,16 +56,24 @@ public class SingleNodeShutdownMetadata extends AbstractDiffable<SingleNodeShutd
PARSER.declareString(ConstructingObjectParser.constructorArg(), TYPE_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), REASON_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), STARTED_AT_MILLIS_FIELD);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.textOrNull(), ALLOCATION_DELAY_FIELD.getPreferredName()), ALLOCATION_DELAY_FIELD,
ObjectParser.ValueType.STRING_OR_NULL
);
}

public static SingleNodeShutdownMetadata parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

public static final TimeValue DEFAULT_RESTART_SHARD_ALLOCATION_DELAY = TimeValue.timeValueMinutes(5);

private final String nodeId;
private final Type type;
private final String reason;
private final long startedAtMillis;
@Nullable private final TimeValue allocationDelay;

/**
* @param nodeId The node ID that this shutdown metadata refers to.
Expand All @@ -72,19 +85,25 @@ private SingleNodeShutdownMetadata(
String nodeId,
Type type,
String reason,
long startedAtMillis
long startedAtMillis,
@Nullable TimeValue allocationDelay
) {
this.nodeId = Objects.requireNonNull(nodeId, "node ID must not be null");
this.type = Objects.requireNonNull(type, "shutdown type must not be null");
this.reason = Objects.requireNonNull(reason, "shutdown reason must not be null");
this.startedAtMillis = startedAtMillis;
if (allocationDelay != null && Type.RESTART.equals(type) == false) {
throw new IllegalArgumentException("shard allocation delay is only valid for RESTART-type shutdowns");
}
this.allocationDelay = allocationDelay;
}

public SingleNodeShutdownMetadata(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.type = in.readEnum(Type.class);
this.reason = in.readString();
this.startedAtMillis = in.readVLong();
this.allocationDelay = in.readOptionalTimeValue();
}

/**
Expand Down Expand Up @@ -115,12 +134,27 @@ public long getStartedAtMillis() {
return startedAtMillis;
}

/**
* @return The amount of time shard reallocation should be delayed for shards on this node, so that they will not be automatically
* reassigned while the node is restarting. Will be {@code null} for non-restart shutdowns.
*/
@Nullable
public TimeValue getAllocationDelay() {
if (allocationDelay != null) {
return allocationDelay;
} else if (Type.RESTART.equals(type)) {
return DEFAULT_RESTART_SHARD_ALLOCATION_DELAY;
}
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
out.writeEnum(type);
out.writeString(reason);
out.writeVLong(startedAtMillis);
out.writeOptionalTimeValue(allocationDelay);
}

@Override
Expand All @@ -131,6 +165,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(TYPE_FIELD.getPreferredName(), type);
builder.field(REASON_FIELD.getPreferredName(), reason);
builder.timeField(STARTED_AT_MILLIS_FIELD.getPreferredName(), STARTED_AT_READABLE_FIELD, startedAtMillis);
if (allocationDelay != null) {
builder.field(ALLOCATION_DELAY_FIELD.getPreferredName(), allocationDelay.getStringRep());
}
}
builder.endObject();

Expand All @@ -145,7 +182,8 @@ public boolean equals(Object o) {
return getStartedAtMillis() == that.getStartedAtMillis()
&& getNodeId().equals(that.getNodeId())
&& getType() == that.getType()
&& getReason().equals(that.getReason());
&& getReason().equals(that.getReason())
&& Objects.equals(allocationDelay, that.allocationDelay);
}

@Override
Expand All @@ -154,7 +192,8 @@ public int hashCode() {
getNodeId(),
getType(),
getReason(),
getStartedAtMillis()
getStartedAtMillis(),
allocationDelay
);
}

Expand All @@ -178,6 +217,7 @@ public static class Builder {
private Type type;
private String reason;
private long startedAtMillis = -1;
private TimeValue allocationDelay;

private Builder() {}

Expand Down Expand Up @@ -217,15 +257,25 @@ public Builder setStartedAtMillis(long startedAtMillis) {
return this;
}

/**
* @param allocationDelay The amount of time shard reallocation should be delayed while this node is offline.
* @return This builder.
*/
public Builder setAllocationDelay(TimeValue allocationDelay) {
this.allocationDelay = allocationDelay;
return this;
}

public SingleNodeShutdownMetadata build() {
if (startedAtMillis == -1) {
throw new IllegalArgumentException("start timestamp must be set");
}

return new SingleNodeShutdownMetadata(
nodeId,
type,
reason,
startedAtMillis
startedAtMillis, allocationDelay
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@
package org.elasticsearch.cluster.routing;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Assertions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -65,6 +67,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {

private final Map<ShardId, List<ShardRouting>> assignedShards = new HashMap<>();

private final Map<String, SingleNodeShutdownMetadata> nodeShutdowns;

private final boolean readOnly;

private int inactivePrimaryCount = 0;
Expand All @@ -83,6 +87,7 @@ public RoutingNodes(ClusterState clusterState) {
public RoutingNodes(ClusterState clusterState, boolean readOnly) {
this.readOnly = readOnly;
final RoutingTable routingTable = clusterState.routingTable();
nodeShutdowns = clusterState.metadata().nodeShutdowns();

Map<String, LinkedHashMap<ShardId, ShardRouting>> nodesToShards = new HashMap<>();
// fill in the nodeToShards with the "live" nodes
Expand Down Expand Up @@ -533,9 +538,17 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
// re-resolve replica as earlier iteration could have changed source/target of replica relocation
ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId());
assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
"primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet());
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.PRIMARY_FAILED,
"primary failed while replica initializing",
null,
0,
unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(),
false,
AllocationStatus.NO_ATTEMPT,
Collections.emptySet(),
routing.currentNodeId());
failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetadata, routingChangesObserver);
}
}
Expand Down Expand Up @@ -858,10 +871,17 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R
UnassignedInfo currInfo = shard.unassignedInfo();
assert currInfo != null;
if (allocationStatus.equals(currInfo.getLastAllocationStatus()) == false) {
UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(),
currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(),
currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(),
allocationStatus, currInfo.getFailedNodeIds());
UnassignedInfo newInfo = new UnassignedInfo(
currInfo.getReason(),
currInfo.getMessage(),
currInfo.getFailure(),
currInfo.getNumFailedAllocations(),
currInfo.getUnassignedTimeInNanos(),
currInfo.getUnassignedTimeInMillis(),
currInfo.isDelayed(),
allocationStatus,
currInfo.getFailedNodeIds(),
currInfo.getLastAllocatedNodeId());
ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource());
changes.unassignedInfoUpdated(shard, newInfo);
shard = updatedShard;
Expand Down
Loading

0 comments on commit 58f66cf

Please sign in to comment.