From 58f66cf04a2fd3afcaa88e879e8dca991fb7a8c1 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Mon, 16 Aug 2021 15:59:50 -0600 Subject: [PATCH] Delay shard reassignment from nodes which are known to be restarting (#75606) This PR makes the delayed allocation infrastructure aware of registered node shutdowns, so that reallocation of shards will be further delayed for nodes which are known to be restarting. To make this more configurable, the Node Shutdown APIs now support a `allocation_delay` parameter, which defaults to 5 minutes. For example: ``` PUT /_nodes/USpTGYaBSIKbgSUJR2Z9lg/shutdown { "type": "restart", "reason": "Demonstrating how the node shutdown API works", "allocation_delay": "20m" } ``` Will cause reallocation of shards assigned to that node to another node to be delayed by 20 minutes. Note that this delay will only be used if it's *longer* than the index-level allocation delay, set via `index.unassigned.node_left.delayed_timeout`. The `allocation_delay` parameter is only valid for `restart`-type shutdown registrations, and the request will be rejected if it's used with another shutdown type. --- .../cluster/ClusterStateDiffIT.java | 5 +- .../cluster/metadata/Metadata.java | 6 + .../metadata/NodesShutdownMetadata.java | 2 +- .../metadata/SingleNodeShutdownMetadata.java | 62 ++++- .../cluster/routing/RoutingNodes.java | 36 ++- .../cluster/routing/UnassignedInfo.java | 123 +++++++-- .../routing/allocation/AllocationService.java | 80 ++++-- .../allocator/BalancedShardsAllocator.java | 21 +- ...AllocateEmptyPrimaryAllocationCommand.java | 2 +- .../gateway/ReplicaShardAllocator.java | 13 +- .../metadata/NodesShutdownMetadataTests.java | 14 +- .../cluster/routing/AllocationIdTests.java | 16 +- .../routing/RandomShardRoutingMutator.java | 31 +-- .../cluster/routing/UnassignedInfoTests.java | 251 ++++++++++++++++-- ...storeInProgressAllocationDeciderTests.java | 14 +- .../gateway/PriorityComparatorTests.java | 17 +- .../gateway/ReplicaShardAllocatorTests.java | 35 ++- .../cluster/routing/TestShardRouting.java | 36 ++- .../core/ilm/CheckShrinkReadyStepTests.java | 27 +- .../ilm/DataTierMigrationRoutedStepTests.java | 6 +- .../ml/integration/MlNodeShutdownIT.java | 23 +- .../indices/IndexStatsMonitoringDocTests.java | 4 +- .../NodeShutdownDelayedAllocationIT.java | 235 ++++++++++++++++ .../xpack/shutdown/NodeShutdownPluginsIT.java | 2 +- .../xpack/shutdown/NodeShutdownTasksIT.java | 2 +- .../xpack/shutdown/PutShutdownNodeAction.java | 25 +- .../TransportPutShutdownNodeAction.java | 1 + 27 files changed, 935 insertions(+), 154 deletions(-) create mode 100644 x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownDelayedAllocationIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index cf4de829d559c..600770ea5f163 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.CoordinationMetadata; @@ -58,7 +59,7 @@ import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.metadata.AliasMetadata.newAliasMetadataBuilder; import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomChange; -import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomReason; +import static org.elasticsearch.cluster.routing.UnassignedInfoTests.randomUnassignedInfo; import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; @@ -270,7 +271,7 @@ private IndexRoutingTable randomIndexRoutingTable(String index, String[] nodeIds for (int j = 0; j < replicaCount; j++) { UnassignedInfo unassignedInfo = null; if (randomInt(5) == 1) { - unassignedInfo = new UnassignedInfo(randomReason(), randomAlphaOfLength(10)); + unassignedInfo = randomUnassignedInfo(randomAlphaOfLength(10)); } if (availableNodeIds.isEmpty()) { break; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 8d559824966a0..267db7de99ced 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -719,6 +719,12 @@ public Map dataStreamAliases() { .orElse(Collections.emptyMap()); } + public Map nodeShutdowns() { + return Optional.ofNullable((NodesShutdownMetadata) this.custom(NodesShutdownMetadata.TYPE)) + .map(NodesShutdownMetadata::getAllNodeMetadataMap) + .orElse(Collections.emptyMap()); + } + public ImmutableOpenMap customs() { return this.customs; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java index a52d27f56a325..59d2743a189be 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java @@ -14,10 +14,10 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.NamedDiff; -import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java index bfe73c4d409d9..200878577b411 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java @@ -10,13 +10,16 @@ import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diffable; -import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import java.io.IOException; import java.util.Locale; @@ -35,6 +38,7 @@ public class SingleNodeShutdownMetadata extends AbstractDiffable PARSER = new ConstructingObjectParser<>( "node_shutdown_info", @@ -42,7 +46,8 @@ public class SingleNodeShutdownMetadata extends AbstractDiffable TimeValue.parseTimeValue(p.textOrNull(), ALLOCATION_DELAY_FIELD.getPreferredName()), ALLOCATION_DELAY_FIELD, + ObjectParser.ValueType.STRING_OR_NULL + ); } public static SingleNodeShutdownMetadata parse(XContentParser parser) { return PARSER.apply(parser, null); } + public static final TimeValue DEFAULT_RESTART_SHARD_ALLOCATION_DELAY = TimeValue.timeValueMinutes(5); + private final String nodeId; private final Type type; private final String reason; private final long startedAtMillis; + @Nullable private final TimeValue allocationDelay; /** * @param nodeId The node ID that this shutdown metadata refers to. @@ -72,12 +85,17 @@ private SingleNodeShutdownMetadata( String nodeId, Type type, String reason, - long startedAtMillis + long startedAtMillis, + @Nullable TimeValue allocationDelay ) { this.nodeId = Objects.requireNonNull(nodeId, "node ID must not be null"); this.type = Objects.requireNonNull(type, "shutdown type must not be null"); this.reason = Objects.requireNonNull(reason, "shutdown reason must not be null"); this.startedAtMillis = startedAtMillis; + if (allocationDelay != null && Type.RESTART.equals(type) == false) { + throw new IllegalArgumentException("shard allocation delay is only valid for RESTART-type shutdowns"); + } + this.allocationDelay = allocationDelay; } public SingleNodeShutdownMetadata(StreamInput in) throws IOException { @@ -85,6 +103,7 @@ public SingleNodeShutdownMetadata(StreamInput in) throws IOException { this.type = in.readEnum(Type.class); this.reason = in.readString(); this.startedAtMillis = in.readVLong(); + this.allocationDelay = in.readOptionalTimeValue(); } /** @@ -115,12 +134,27 @@ public long getStartedAtMillis() { return startedAtMillis; } + /** + * @return The amount of time shard reallocation should be delayed for shards on this node, so that they will not be automatically + * reassigned while the node is restarting. Will be {@code null} for non-restart shutdowns. + */ + @Nullable + public TimeValue getAllocationDelay() { + if (allocationDelay != null) { + return allocationDelay; + } else if (Type.RESTART.equals(type)) { + return DEFAULT_RESTART_SHARD_ALLOCATION_DELAY; + } + return null; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(nodeId); out.writeEnum(type); out.writeString(reason); out.writeVLong(startedAtMillis); + out.writeOptionalTimeValue(allocationDelay); } @Override @@ -131,6 +165,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(TYPE_FIELD.getPreferredName(), type); builder.field(REASON_FIELD.getPreferredName(), reason); builder.timeField(STARTED_AT_MILLIS_FIELD.getPreferredName(), STARTED_AT_READABLE_FIELD, startedAtMillis); + if (allocationDelay != null) { + builder.field(ALLOCATION_DELAY_FIELD.getPreferredName(), allocationDelay.getStringRep()); + } } builder.endObject(); @@ -145,7 +182,8 @@ public boolean equals(Object o) { return getStartedAtMillis() == that.getStartedAtMillis() && getNodeId().equals(that.getNodeId()) && getType() == that.getType() - && getReason().equals(that.getReason()); + && getReason().equals(that.getReason()) + && Objects.equals(allocationDelay, that.allocationDelay); } @Override @@ -154,7 +192,8 @@ public int hashCode() { getNodeId(), getType(), getReason(), - getStartedAtMillis() + getStartedAtMillis(), + allocationDelay ); } @@ -178,6 +217,7 @@ public static class Builder { private Type type; private String reason; private long startedAtMillis = -1; + private TimeValue allocationDelay; private Builder() {} @@ -217,15 +257,25 @@ public Builder setStartedAtMillis(long startedAtMillis) { return this; } + /** + * @param allocationDelay The amount of time shard reallocation should be delayed while this node is offline. + * @return This builder. + */ + public Builder setAllocationDelay(TimeValue allocationDelay) { + this.allocationDelay = allocationDelay; + return this; + } + public SingleNodeShutdownMetadata build() { if (startedAtMillis == -1) { throw new IllegalArgumentException("start timestamp must be set"); } + return new SingleNodeShutdownMetadata( nodeId, type, reason, - startedAtMillis + startedAtMillis, allocationDelay ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 78bade7493875..b207776d49260 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -9,18 +9,20 @@ package org.elasticsearch.cluster.routing; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.apache.logging.log4j.Logger; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.Assertions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; import org.elasticsearch.cluster.service.MasterService; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Randomness; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -65,6 +67,8 @@ public class RoutingNodes implements Iterable { private final Map> assignedShards = new HashMap<>(); + private final Map nodeShutdowns; + private final boolean readOnly; private int inactivePrimaryCount = 0; @@ -83,6 +87,7 @@ public RoutingNodes(ClusterState clusterState) { public RoutingNodes(ClusterState clusterState, boolean readOnly) { this.readOnly = readOnly; final RoutingTable routingTable = clusterState.routingTable(); + nodeShutdowns = clusterState.metadata().nodeShutdowns(); Map> nodesToShards = new HashMap<>(); // fill in the nodeToShards with the "live" nodes @@ -533,9 +538,17 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId // re-resolve replica as earlier iteration could have changed source/target of replica relocation ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId()); assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas"; - UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, - "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet()); + UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo( + UnassignedInfo.Reason.PRIMARY_FAILED, + "primary failed while replica initializing", + null, + 0, + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + false, + AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + routing.currentNodeId()); failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetadata, routingChangesObserver); } } @@ -858,10 +871,17 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R UnassignedInfo currInfo = shard.unassignedInfo(); assert currInfo != null; if (allocationStatus.equals(currInfo.getLastAllocationStatus()) == false) { - UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(), - currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(), - currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(), - allocationStatus, currInfo.getFailedNodeIds()); + UnassignedInfo newInfo = new UnassignedInfo( + currInfo.getReason(), + currInfo.getMessage(), + currInfo.getFailure(), + currInfo.getNumFailedAllocations(), + currInfo.getUnassignedTimeInNanos(), + currInfo.getUnassignedTimeInMillis(), + currInfo.isDelayed(), + allocationStatus, + currInfo.getFailedNodeIds(), + currInfo.getLastAllocatedNodeId()); ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource()); changes.unassignedInfoUpdated(shard, newInfo); shard = updatedShard; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index 7ef47eee9e7de..f8ed1f8444350 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -9,11 +9,12 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -21,9 +22,10 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import java.io.IOException; import java.time.Instant; @@ -31,7 +33,9 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; /** @@ -39,6 +43,12 @@ */ public final class UnassignedInfo implements ToXContentFragment, Writeable { + /** + * The version that the {@code lastAllocatedNode} field was added in. Used to adapt streaming of this class as appropriate for the + * version of the node sending/receiving it. Should be removed once wire compatibility with this version is no longer necessary. + */ + private static final Version VERSION_LAST_ALLOCATED_NODE_ADDED = Version.V_8_0_0; + public static final DateFormatter DATE_TIME_FORMATTER = DateFormatter.forPattern("date_optional_time").withZone(ZoneOffset.UTC); public static final Setting INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING = @@ -114,7 +124,12 @@ public enum Reason { /** * Unassigned as a result of closing an index. */ - INDEX_CLOSED + INDEX_CLOSED, + /** + * Similar to NODE_LEFT, but at the time the node left, it had been registered for a restart via the Node Shutdown API. Note that + * there is no verification that it was ready to be restarted, so this may be an intentional restart or a node crash. + */ + NODE_RESTARTING } /** @@ -208,6 +223,7 @@ public String value() { private final int failedAllocations; private final Set failedNodeIds; private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard + private final String lastAllocatedNodeId; /** * creates an UnassignedInfo object based on **current** time @@ -217,22 +233,32 @@ public String value() { **/ public UnassignedInfo(Reason reason, String message) { this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT, Collections.emptySet()); + AllocationStatus.NO_ATTEMPT, Collections.emptySet(), null); } /** - * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. - * @param message more information about cause. - * @param failure the shard level failure that caused this shard to be unassigned, if exists. - * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation - * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. - * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. - * @param lastAllocationStatus the result of the last allocation attempt for this shard - * @param failedNodeIds a set of nodeIds that failed to complete allocations for this shard + * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. + * @param message more information about cause. + * @param failure the shard level failure that caused this shard to be unassigned, if exists. + * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation + * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. + * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. + * @param lastAllocationStatus the result of the last allocation attempt for this shard + * @param failedNodeIds a set of nodeIds that failed to complete allocations for this shard + * @param lastAllocatedNodeId the ID of the node this shard was last allocated to */ - public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations, - long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus, - Set failedNodeIds) { + public UnassignedInfo( + Reason reason, + @Nullable String message, + @Nullable Exception failure, + int failedAllocations, + long unassignedTimeNanos, + long unassignedTimeMillis, + boolean delayed, + AllocationStatus lastAllocationStatus, + Set failedNodeIds, + @Nullable String lastAllocatedNodeId + ) { this.reason = Objects.requireNonNull(reason); this.unassignedTimeMillis = unassignedTimeMillis; this.unassignedTimeNanos = unassignedTimeNanos; @@ -242,13 +268,23 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti this.failedAllocations = failedAllocations; this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus); this.failedNodeIds = Set.copyOf(failedNodeIds); - assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) : - "failedAllocations: " + failedAllocations + " for reason " + reason; + this.lastAllocatedNodeId = lastAllocatedNodeId; + assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) : "failedAllocations: " + + failedAllocations + + " for reason " + + reason; assert (message == null && failure != null) == false : "provide a message if a failure exception is provided"; - assert (delayed && reason != Reason.NODE_LEFT) == false : "shard can only be delayed if it is unassigned due to a node leaving"; + assert (delayed + && reason != Reason.NODE_LEFT + && reason != Reason.NODE_RESTARTING) == false : "shard can only be delayed if it is unassigned due to a node leaving"; + // The below check should be expanded to require `lastAllocatedNodeId` for `NODE_LEFT` as well, once we no longer have to consider + // BWC with versions prior to `VERSION_LAST_ALLOCATED_NODE_ADDED`. + assert (reason == Reason.NODE_RESTARTING && lastAllocatedNodeId == null) == false + : "last allocated node ID must be set if the shard is unassigned due to a node restarting"; } public UnassignedInfo(StreamInput in) throws IOException { + // Because Reason.NODE_RESTARTING is new and can't be sent by older versions, there's no need to vary the deserialization behavior this.reason = Reason.values()[(int) in.readByte()]; this.unassignedTimeMillis = in.readLong(); // As System.nanoTime() cannot be compared across different JVMs, reset it to now. @@ -260,10 +296,19 @@ public UnassignedInfo(StreamInput in) throws IOException { this.failedAllocations = in.readVInt(); this.lastAllocationStatus = AllocationStatus.readFrom(in); this.failedNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString)); + if (in.getVersion().onOrAfter(VERSION_LAST_ALLOCATED_NODE_ADDED)) { + this.lastAllocatedNodeId = in.readOptionalString(); + } else { + this.lastAllocatedNodeId = null; + } } public void writeTo(StreamOutput out) throws IOException { - out.writeByte((byte) reason.ordinal()); + if (reason.equals(Reason.NODE_RESTARTING) && out.getVersion().before(VERSION_LAST_ALLOCATED_NODE_ADDED)) { + out.writeByte((byte) Reason.NODE_LEFT.ordinal()); + } else { + out.writeByte((byte) reason.ordinal()); + } out.writeLong(unassignedTimeMillis); // Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs out.writeBoolean(delayed); @@ -272,6 +317,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(failedAllocations); lastAllocationStatus.writeTo(out); out.writeCollection(failedNodeIds, StreamOutput::writeString); + if (out.getVersion().onOrAfter(VERSION_LAST_ALLOCATED_NODE_ADDED)) { + out.writeOptionalString(lastAllocatedNodeId); + } } /** @@ -339,6 +387,14 @@ public String getDetails() { return message + (failure == null ? "" : ", failure " + ExceptionsHelper.stackTrace(failure)); } + /** + * Gets the ID of the node this shard was last allocated to, or null if unavailable. + */ + @Nullable + public String getLastAllocatedNodeId() { + return lastAllocatedNodeId; + } + /** * Get the status for the last allocation attempt for this shard. */ @@ -366,8 +422,21 @@ public Set getFailedNodeIds() { * * @return calculated delay in nanoseconds */ - public long getRemainingDelay(final long nanoTimeNow, final Settings indexSettings) { - long delayTimeoutNanos = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexSettings).nanos(); + public long getRemainingDelay( + final long nanoTimeNow, + final Settings indexSettings, + final Map nodesShutdownMap + ) { + final long indexLevelDelay = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexSettings).nanos(); + long delayTimeoutNanos = Optional.ofNullable(lastAllocatedNodeId) + // If the node wasn't restarting when this became unassigned, use default delay + .filter(nodeId -> reason.equals(Reason.NODE_RESTARTING)) + .map(nodesShutdownMap::get) + .filter(shutdownMetadata -> SingleNodeShutdownMetadata.Type.RESTART.equals(shutdownMetadata.getType())) + .map(SingleNodeShutdownMetadata::getAllocationDelay) + .map(TimeValue::nanos) + .map(knownRestartDelay -> Math.max(indexLevelDelay, knownRestartDelay)) + .orElse(indexLevelDelay); assert nanoTimeNow >= unassignedTimeNanos; return Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos)); } @@ -399,7 +468,11 @@ public static long findNextDelayedAllocation(long currentNanoTime, ClusterState if (unassignedInfo.isDelayed()) { Settings indexSettings = metadata.index(shard.index()).getSettings(); // calculate next time to schedule - final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay(currentNanoTime, indexSettings); + final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay( + currentNanoTime, + indexSettings, + metadata.nodeShutdowns() + ); if (newComputedLeftDelayNanos < nextDelayNanos) { nextDelayNanos = newComputedLeftDelayNanos; } @@ -486,6 +559,11 @@ public boolean equals(Object o) { if (Objects.equals(failure, that.failure) == false) { return false; } + + if (Objects.equals(lastAllocatedNodeId, that.lastAllocatedNodeId) == false) { + return false; + } + return failedNodeIds.equals(that.failedNodeIds); } @@ -499,6 +577,7 @@ public int hashCode() { result = 31 * result + (failure != null ? failure.hashCode() : 0); result = 31 * result + lastAllocationStatus.hashCode(); result = 31 * result + failedNodeIds.hashCode(); + result = 31 * result + (lastAllocatedNodeId != null ? lastAllocatedNodeId.hashCode() : 0); return result; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 5e7d008f1e6f2..948ec14bea434 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -19,6 +19,8 @@ import org.elasticsearch.cluster.metadata.AutoExpandReplicas; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingNode; @@ -45,6 +47,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -209,7 +212,7 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis String message = "failed shard on node [" + shardToFail.currentNodeId() + "]: " + failedShardEntry.getMessage(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message, failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT, failedNodeIds); + AllocationStatus.NO_ATTEMPT, failedNodeIds, shardToFail.currentNodeId()); if (failedShardEntry.markAsStale()) { allocation.removeAllocationId(failedShard); } @@ -300,13 +303,27 @@ private void removeDelayMarkers(RoutingAllocation allocation) { ShardRouting shardRouting = unassignedIterator.next(); UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); if (unassignedInfo.isDelayed()) { - final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay(allocation.getCurrentNanoTime(), - metadata.getIndexSafe(shardRouting.index()).getSettings()); + final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay( + allocation.getCurrentNanoTime(), + metadata.getIndexSafe(shardRouting.index()).getSettings(), + metadata.nodeShutdowns() + ); if (newComputedLeftDelayNanos == 0) { - unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), - unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(), - unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); + unassignedIterator.updateUnassigned( + new UnassignedInfo( + unassignedInfo.getReason(), + unassignedInfo.getMessage(), + unassignedInfo.getFailure(), + unassignedInfo.getNumFailedAllocations(), + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + false, + unassignedInfo.getLastAllocationStatus(), + unassignedInfo.getFailedNodeIds(), + unassignedInfo.getLastAllocatedNodeId()), + shardRouting.recoverySource(), + allocation.changes() + ); } } } @@ -320,11 +337,21 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) { while (unassignedIterator.hasNext()) { ShardRouting shardRouting = unassignedIterator.next(); UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); - unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getNumFailedAllocations() > 0 ? - UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(), - unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), - unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes()); + unassignedIterator.updateUnassigned( + new UnassignedInfo( + unassignedInfo.getNumFailedAllocations() > 0 ? UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), + unassignedInfo.getMessage(), + unassignedInfo.getFailure(), + 0, + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + unassignedInfo.isDelayed(), + unassignedInfo.getLastAllocationStatus(), + Collections.emptySet(), + unassignedInfo.getLastAllocatedNodeId()), + shardRouting.recoverySource(), + allocation.changes() + ); } } @@ -460,19 +487,40 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { } private void disassociateDeadNodes(RoutingAllocation allocation) { + Map nodesShutdownMetadata = allocation.metadata().nodeShutdowns(); + for (Iterator it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) { RoutingNode node = it.next(); if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) { // its a live node, continue continue; } + final UnassignedInfo.Reason + unassignedReason = + nodesShutdownMetadata.containsKey(node.nodeId()) ? + UnassignedInfo.Reason.NODE_RESTARTING : + UnassignedInfo.Reason.NODE_LEFT; // now, go over all the shards routing on the node, and fail them for (ShardRouting shardRouting : node.copyShards()) { final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); - boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).nanos() > 0; - UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]", - null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT, - Collections.emptySet()); + boolean delayedDueToKnownRestart = Optional.ofNullable(nodesShutdownMetadata.get(node.nodeId())) + .map(shutdown -> Type.RESTART.equals(shutdown.getType()) && shutdown.getAllocationDelay().nanos() > 0) + .orElse(false); + boolean delayed = delayedDueToKnownRestart + || INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).nanos() > 0; + + UnassignedInfo unassignedInfo = new UnassignedInfo( + unassignedReason, + "node_left [" + node.nodeId() + "]", + null, + 0, + allocation.getCurrentNanoTime(), + System.currentTimeMillis(), + delayed, + AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + shardRouting.currentNodeId() + ); allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetadata, allocation.changes()); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 59a332d6ce5bf..0b223d0e69c3d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -138,11 +138,22 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { final ShardRouting shardRouting = unassignedIterator.next(); final UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == AllocationStatus.NO_ATTEMPT) { - unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), - unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO, - unassignedInfo.getFailedNodeIds()), - shardRouting.recoverySource(), allocation.changes()); + unassignedIterator.updateUnassigned( + new UnassignedInfo( + unassignedInfo.getReason(), + unassignedInfo.getMessage(), + unassignedInfo.getFailure(), + unassignedInfo.getNumFailedAllocations(), + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + unassignedInfo.isDelayed(), + AllocationStatus.DECIDERS_NO, + unassignedInfo.getFailedNodeIds(), + unassignedInfo.getLastAllocatedNodeId() + ), + shardRouting.recoverySource(), + allocation.changes() + ); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java index eeb76f336bd59..980d88d3c42a8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java @@ -129,7 +129,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) ", " + shardRouting.unassignedInfo().getMessage(); unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage, shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false, - shardRouting.unassignedInfo().getLastAllocationStatus(), Collections.emptySet()); + shardRouting.unassignedInfo().getLastAllocationStatus(), Collections.emptySet(), null); } initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate, diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 2108585a54c5b..43adcf9ef70e1 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -9,6 +9,7 @@ package org.elasticsearch.gateway; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -23,10 +24,10 @@ import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult.ShardStoreInfo; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; @@ -103,7 +104,7 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds); + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds, null); // don't cancel shard in the loop as it will cause a ConcurrentModificationException shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, metadata.getIndexSafe(shard.index()), allocation.changes())); @@ -213,7 +214,11 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas Metadata metadata = allocation.metadata(); IndexMetadata indexMetadata = metadata.index(unassignedShard.index()); totalDelayMillis = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).getMillis(); - long remainingDelayNanos = unassignedInfo.getRemainingDelay(System.nanoTime(), indexMetadata.getSettings()); + long remainingDelayNanos = unassignedInfo.getRemainingDelay( + System.nanoTime(), + indexMetadata.getSettings(), + metadata.nodeShutdowns() + ); remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis(); } return AllocateUnassignedDecision.delayed(remainingDelayMillis, totalDelayMillis, nodeDecisions); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java index 363659fbacec0..cfa3909c4db5b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.AbstractDiffableSerializationTestCase; import java.io.IOException; @@ -78,11 +79,16 @@ protected NodesShutdownMetadata createTestInstance() { } private SingleNodeShutdownMetadata randomNodeShutdownInfo() { - return SingleNodeShutdownMetadata.builder().setNodeId(randomAlphaOfLength(5)) - .setType(randomBoolean() ? SingleNodeShutdownMetadata.Type.REMOVE : SingleNodeShutdownMetadata.Type.RESTART) + final SingleNodeShutdownMetadata.Type type = randomFrom(SingleNodeShutdownMetadata.Type.values()); + final SingleNodeShutdownMetadata.Builder builder = SingleNodeShutdownMetadata.builder() + .setNodeId(randomAlphaOfLength(5)) + .setType(type) .setReason(randomAlphaOfLength(5)) - .setStartedAtMillis(randomNonNegativeLong()) - .build(); + .setStartedAtMillis(randomNonNegativeLong()); + if (type.equals(SingleNodeShutdownMetadata.Type.RESTART) && randomBoolean()) { + builder.setAllocationDelay(TimeValue.parseTimeValue(randomTimeValue(), this.getTestName())); + } + return builder.build(); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java index 0b5e08c269ca0..ab7b27dfa6bf4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.Collections; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -98,7 +99,20 @@ public void testMoveToUnassigned() { shard = shard.moveToStarted(); logger.info("-- move to unassigned"); - shard = shard.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, null)); + shard = shard.moveToUnassigned( + new UnassignedInfo( + UnassignedInfo.Reason.NODE_LEFT, + this.getTestName(), + null, + 0, + System.nanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + randomAlphaOfLength(10) + ) + ); assertThat(shard.allocationId(), nullValue()); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java b/server/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java index 14ae849dee906..de6de339e6ada 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java @@ -10,6 +10,7 @@ import java.util.Set; +import static org.elasticsearch.cluster.routing.UnassignedInfoTests.randomUnassignedInfo; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomInt; @@ -26,9 +27,9 @@ public static ShardRouting randomChange(ShardRouting shardRouting, Set n switch (randomInt(2)) { case 0: if (shardRouting.unassigned() == false && shardRouting.primary() == false) { - shardRouting = shardRouting.moveToUnassigned(new UnassignedInfo(randomReason(), randomAlphaOfLength(10))); + shardRouting = shardRouting.moveToUnassigned(randomUnassignedInfo(randomAlphaOfLength(10), false)); } else if (shardRouting.unassignedInfo() != null) { - shardRouting = shardRouting.updateUnassigned(new UnassignedInfo(randomReason(), randomAlphaOfLength(10)), + shardRouting = shardRouting.updateUnassigned(randomUnassignedInfo(randomAlphaOfLength(10), false), shardRouting.recoverySource()); } break; @@ -45,30 +46,4 @@ public static ShardRouting randomChange(ShardRouting shardRouting, Set n } return shardRouting; } - - - public static UnassignedInfo.Reason randomReason() { - switch (randomInt(9)) { - case 0: - return UnassignedInfo.Reason.INDEX_CREATED; - case 1: - return UnassignedInfo.Reason.CLUSTER_RECOVERED; - case 2: - return UnassignedInfo.Reason.INDEX_REOPENED; - case 3: - return UnassignedInfo.Reason.DANGLING_INDEX_IMPORTED; - case 4: - return UnassignedInfo.Reason.NEW_INDEX_RESTORED; - case 5: - return UnassignedInfo.Reason.EXISTING_INDEX_RESTORED; - case 6: - return UnassignedInfo.Reason.REPLICA_ADDED; - case 7: - return UnassignedInfo.Reason.ALLOCATION_FAILED; - case 8: - return UnassignedInfo.Reason.NODE_LEFT; - default: - return UnassignedInfo.Reason.REROUTE_CANCELLED; - } - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 092f7c651b075..21c1e8b3484fd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -10,12 +10,14 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.randomizedtesting.generators.RandomPicks; + import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; @@ -25,6 +27,7 @@ import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.repositories.IndexId; @@ -34,6 +37,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -64,7 +70,8 @@ public void testReasonOrdinalOrder() { UnassignedInfo.Reason.PRIMARY_FAILED, UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, UnassignedInfo.Reason.MANUAL_ALLOCATION, - UnassignedInfo.Reason.INDEX_CLOSED,}; + UnassignedInfo.Reason.INDEX_CLOSED, + UnassignedInfo.Reason.NODE_RESTARTING}; for (int i = 0; i < order.length; i++) { assertThat(order[i].ordinal(), equalTo(i)); } @@ -76,10 +83,41 @@ public void testSerialization() throws Exception { int failedAllocations = randomIntBetween(1, 100); Set failedNodes = IntStream.range(0, between(0, failedAllocations)) .mapToObj(n -> "failed-node-" + n).collect(Collectors.toSet()); - UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? - new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null, - failedAllocations, System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT, failedNodes): - new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); + + UnassignedInfo meta; + if (reason == UnassignedInfo.Reason.ALLOCATION_FAILED) { + meta = new UnassignedInfo( + reason, + randomBoolean() ? randomAlphaOfLength(4) : null, + null, + failedAllocations, + System.nanoTime(), + System.currentTimeMillis(), + false, + AllocationStatus.NO_ATTEMPT, + failedNodes, + null); + } else if (reason == UnassignedInfo.Reason.NODE_LEFT || reason == UnassignedInfo.Reason.NODE_RESTARTING) { + String lastAssignedNodeId = randomAlphaOfLength(10); + if (reason == UnassignedInfo.Reason.NODE_LEFT && randomBoolean()) { + // If the reason is `NODE_LEFT`, sometimes we'll have an empty lastAllocatedNodeId due to BWC + lastAssignedNodeId = null; + } + meta = new UnassignedInfo( + reason, + randomBoolean() ? randomAlphaOfLength(4) : null, + null, + 0, + System.nanoTime(), + System.currentTimeMillis(), + false, + AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + lastAssignedNodeId + ); + } else { + meta = new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); + } BytesStreamOutput out = new BytesStreamOutput(); meta.writeTo(out); out.close(); @@ -91,6 +129,7 @@ public void testSerialization() throws Exception { assertThat(read.getDetails(), equalTo(meta.getDetails())); assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations())); assertThat(read.getFailedNodeIds(), equalTo(meta.getFailedNodeIds())); + assertThat(read.getLastAllocatedNodeId(), equalTo(meta.getLastAllocatedNodeId())); } public void testIndexCreated() { @@ -290,23 +329,168 @@ public void testFailedShard() { } /** - * Verifies that delayed allocation calculation are correct. + * Verifies that delayed allocation calculation are correct when there are no registered node shutdowns. + */ + public void testRemainingDelayCalculationWithNoShutdowns() throws Exception { + checkRemainingDelayCalculation( + "bogusNodeId", + TimeValue.timeValueNanos(10), + Collections.emptyMap(), + TimeValue.timeValueNanos(10), + false + ); + } + + /** + * Verifies that delayed allocation calculations are correct when there are registered node shutdowns for nodes which are not relevant + * to the shard currently being evaluated. + */ + public void testRemainingDelayCalculationsWithUnrelatedShutdowns() throws Exception { + String lastNodeId = "bogusNodeId"; + Map shutdowns = new HashMap<>(); + int numberOfShutdowns = randomIntBetween(1,15); + for (int i = 0; i <= numberOfShutdowns; i++) { + SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder() + .setNodeId(randomValueOtherThan(lastNodeId, () -> randomAlphaOfLengthBetween(5,10))) + .setReason(this.getTestName()) + .setStartedAtMillis(randomNonNegativeLong()) + .setType(randomFrom(EnumSet.allOf(SingleNodeShutdownMetadata.Type.class))) + .build(); + shutdowns.put(shutdown.getNodeId(), shutdown); + } + checkRemainingDelayCalculation(lastNodeId, TimeValue.timeValueNanos(10), shutdowns, TimeValue.timeValueNanos(10), false); + } + + /** + * Verifies that delay calculation is not impacted when the node the shard was last assigned to was registered for removal. + */ + public void testRemainingDelayCalculationWhenNodeIsShuttingDownForRemoval() throws Exception { + String lastNodeId = "bogusNodeId"; + Map shutdowns = new HashMap<>(); + SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder() + .setNodeId(lastNodeId) + .setReason(this.getTestName()) + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.REMOVE) + .build(); + shutdowns.put(shutdown.getNodeId(), shutdown); + + checkRemainingDelayCalculation(lastNodeId, TimeValue.timeValueNanos(10), shutdowns, TimeValue.timeValueNanos(10), false); + } + + /** + * Verifies that the delay calculation uses the configured delay value for nodes known to be restarting, because they are registered for + * a `RESTART`-type shutdown, rather than the default global delay. */ - public void testRemainingDelayCalculation() throws Exception { + public void testRemainingDelayCalculationWhenNodeIsKnownToBeRestartingWithCustomDelay() throws Exception { + String lastNodeId = "bogusNodeId"; + Map shutdowns = new HashMap<>(); + SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder() + .setNodeId(lastNodeId) + .setReason(this.getTestName()) + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.RESTART) + .setAllocationDelay(TimeValue.timeValueMinutes(1)) + .build(); + shutdowns.put(shutdown.getNodeId(), shutdown); + + // Use a different index-level delay so this test will fail if that one gets used instead of the one from the shutdown metadata + checkRemainingDelayCalculation(lastNodeId, TimeValue.timeValueNanos(10), shutdowns, TimeValue.timeValueMinutes(1), true); + } + + /** + * Verifies that the delay calculation uses the default delay value for nodes known to be restarting, because they are registered for + * a `RESTART`-type shutdown, rather than the default global delay. + */ + public void testRemainingDelayCalculationWhenNodeIsKnownToBeRestartingWithDefaultDelay() throws Exception { + String lastNodeId = "bogusNodeId"; + Map shutdowns = new HashMap<>(); + + // Note that we do not explicitly configure the reallocation delay here. + SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder() + .setNodeId(lastNodeId) + .setReason(this.getTestName()) + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.RESTART) + .build(); + shutdowns.put(shutdown.getNodeId(), shutdown); + + // Use a different index-level delay so this test will fail if that one gets used instead of the one from the shutdown metadata + checkRemainingDelayCalculation( + lastNodeId, + TimeValue.timeValueNanos(10), + shutdowns, + SingleNodeShutdownMetadata.DEFAULT_RESTART_SHARD_ALLOCATION_DELAY, + true + ); + } + + public void testRemainingDelayUsesIndexLevelDelayIfNodeWasNotRestartingWhenShardBecameUnassigned() throws Exception { + String lastNodeId = "bogusNodeId"; + Map shutdowns = new HashMap<>(); + + // Generate a random time value - but don't use nanos as extremely small values of nanos can break assertion calculations + final TimeValue shutdownDelay = TimeValue.parseTimeValue( + randomTimeValue(100, 1000, "d", "h", "ms", "s", "m", "micros"), + this.getTestName() + ); + SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder() + .setNodeId(lastNodeId) + .setReason(this.getTestName()) + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.RESTART) + .setAllocationDelay(shutdownDelay) + .build(); + shutdowns.put(shutdown.getNodeId(), shutdown); + + // We want an index level delay that's less than the shutdown delay to avoid picking the index-level delay because it's larger + final TimeValue indexLevelDelay = randomValueOtherThanMany( + tv -> shutdownDelay.compareTo(tv) < 0, + () -> TimeValue.parseTimeValue(randomTimeValue(1, 1000, "d", "h", "ms", "s", "m", "micros"), this.getTestName()) + ); + + logger.info("index level delay: {}, shutdown delay: {}", indexLevelDelay, shutdownDelay); + checkRemainingDelayCalculation( + lastNodeId, + indexLevelDelay, + shutdowns, + indexLevelDelay, + false + ); + } + + private void checkRemainingDelayCalculation( + String lastNodeId, + TimeValue indexLevelTimeoutSetting, + Map nodeShutdowns, + TimeValue expectedTotalDelay, + boolean nodeRestarting + ) throws Exception { final long baseTime = System.nanoTime(); - UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, - System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT, Collections.emptySet()); - final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos(); + UnassignedInfo unassignedInfo = new UnassignedInfo( + nodeRestarting ? UnassignedInfo.Reason.NODE_RESTARTING : UnassignedInfo.Reason.NODE_LEFT, + "test", + null, + 0, + baseTime, + System.currentTimeMillis(), + randomBoolean(), + AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + lastNodeId + ); + final long totalDelayNanos = expectedTotalDelay.nanos(); final Settings indexSettings = Settings.builder() - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build(); - long delay = unassignedInfo.getRemainingDelay(baseTime, indexSettings); + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), indexLevelTimeoutSetting) + .build(); + long delay = unassignedInfo.getRemainingDelay(baseTime, indexSettings, nodeShutdowns); assertThat(delay, equalTo(totalDelayNanos)); - long delta1 = randomIntBetween(1, (int) (totalDelayNanos - 1)); - delay = unassignedInfo.getRemainingDelay(baseTime + delta1, indexSettings); + long delta1 = randomLongBetween(1, (totalDelayNanos - 1)); + delay = unassignedInfo.getRemainingDelay(baseTime + delta1, indexSettings, nodeShutdowns); assertThat(delay, equalTo(totalDelayNanos - delta1)); - delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos, indexSettings); + delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos, indexSettings, nodeShutdowns); assertThat(delay, equalTo(0L)); - delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), indexSettings); + delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), indexSettings, nodeShutdowns); assertThat(delay, equalTo(0L)); } @@ -386,4 +570,39 @@ public void testAllocationStatusSerialization() throws IOException { assertThat(readStatus, equalTo(allocationStatus)); } } + + public static UnassignedInfo randomUnassignedInfo(String message) { + return randomUnassignedInfo(message, null); + } + + /** + * Randomly generates an UnassignedInfo. + * @param message The message to be used. + * @param delayed Used for the `delayed` flag if provided. + * @return A randomly-generated UnassignedInfo with the given message and delayed value (if any) + */ + public static UnassignedInfo randomUnassignedInfo(String message, @Nullable Boolean delayed) { + UnassignedInfo.Reason reason = randomFrom(UnassignedInfo.Reason.values()); + String lastAllocatedNodeId = null; + boolean delayedFlag = delayed == null ? false : delayed; + if (reason == UnassignedInfo.Reason.NODE_LEFT || reason == UnassignedInfo.Reason.NODE_RESTARTING) { + if (randomBoolean() && delayed == null) { + delayedFlag = true; + } + lastAllocatedNodeId = randomAlphaOfLength(10); + } + int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0; + return new UnassignedInfo( + reason, + message, + null, + failedAllocations, + System.nanoTime(), + System.currentTimeMillis(), + delayedFlag, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + lastAllocatedNodeId + ); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java index c25ca14028481..fc3b66bd34a70 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -108,9 +108,17 @@ public void testCanAllocatePrimaryExistingInRestoreInProgress() { shardState = RestoreInProgress.State.FAILURE; UnassignedInfo currentInfo = primary.unassignedInfo(); - UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"), - currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), currentInfo.getUnassignedTimeInMillis(), - currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), currentInfo.getFailedNodeIds()); + UnassignedInfo newInfo = new UnassignedInfo( + currentInfo.getReason(), + currentInfo.getMessage(), + new IOException("i/o failure"), + currentInfo.getNumFailedAllocations(), + currentInfo.getUnassignedTimeInNanos(), + currentInfo.getUnassignedTimeInMillis(), + currentInfo.isDelayed(), + currentInfo.getLastAllocationStatus(), + currentInfo.getFailedNodeIds(), + currentInfo.getLastAllocatedNodeId()); primary = primary.updateUnassigned(newInfo, primary.recoverySource()); IndexRoutingTable indexRoutingTable = routingTable.index("test"); diff --git a/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java b/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java index 7477849c4288c..0b41e4d626032 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; @@ -25,6 +24,7 @@ import java.util.Locale; import java.util.Map; +import static org.elasticsearch.cluster.routing.UnassignedInfoTests.randomUnassignedInfo; import static org.hamcrest.Matchers.greaterThan; import static org.mockito.Mockito.mock; @@ -34,9 +34,9 @@ public void testPreferNewIndices() { RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class)); List shardRoutings = Arrays.asList( TestShardRouting.newShardRouting("oldest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")), + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar")), TestShardRouting.newShardRouting("newest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar"))); + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar"))); Collections.shuffle(shardRoutings, random()); for (ShardRouting routing : shardRoutings) { shards.add(routing); @@ -68,9 +68,9 @@ public void testPreferPriorityIndices() { RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class)); List shardRoutings = Arrays.asList( TestShardRouting.newShardRouting("oldest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")), + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar")), TestShardRouting.newShardRouting("newest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar"))); + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar"))); Collections.shuffle(shardRoutings, random()); for (ShardRouting routing : shardRoutings) { shards.add(routing); @@ -102,9 +102,9 @@ public void testPreferSystemIndices() { RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class)); List shardRoutings = Arrays.asList( TestShardRouting.newShardRouting("oldest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")), + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar")), TestShardRouting.newShardRouting("newest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar"))); + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar"))); Collections.shuffle(shardRoutings, random()); for (ShardRouting routing : shardRoutings) { shards.add(routing); @@ -165,8 +165,7 @@ public void testPriorityComparatorSort() { for (int i = 0; i < numShards; i++) { IndexMetadata indexMeta = randomFrom(indices); shards.add(TestShardRouting.newShardRouting(indexMeta.getIndex().getName(), randomIntBetween(1, 5), null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), - "foobar"))); + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar"))); } shards.sort(new PriorityComparator() { @Override diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 0b5a289a32f5e..75369c0c89fd2 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -190,8 +190,18 @@ public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null); } else { failedNodeIds = new HashSet<>(randomSubsetOf(Set.of("node-4", "node-5", "node-6", "node-7"))); - unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), - System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds); + unassignedInfo = new UnassignedInfo( + UnassignedInfo.Reason.ALLOCATION_FAILED, + null, + null, + randomIntBetween(1, 10), + System.nanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + failedNodeIds, + null + ); } RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); @@ -375,8 +385,18 @@ public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() { if (randomBoolean()) { unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null); } else { - unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), - System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, Set.of("node-4")); + unassignedInfo = new UnassignedInfo( + UnassignedInfo.Reason.ALLOCATION_FAILED, + null, + null, + randomIntBetween(1, 10), + System.nanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Set.of("node-4"), + null + ); } RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); List retentionLeases = new ArrayList<>(); @@ -417,7 +437,7 @@ public void testDoNotCancelForBrokenNode() { } UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(failedNodes.size(), 10), System.nanoTime(), System.currentTimeMillis(), false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodes); + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodes, null); RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); List retentionLeases = Arrays.asList( @@ -446,6 +466,9 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide boolean delayed = reason == UnassignedInfo.Reason.NODE_LEFT && UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings).nanos() > 0; int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0; + final String lastAllocatedNodeId = reason == UnassignedInfo.Reason.NODE_RESTARTING || randomBoolean() + ? randomAlphaOfLength(10) + : null; RoutingTable routingTable = RoutingTable.builder() .add(IndexRoutingTable.builder(shardId.getIndex()) .addIndexShard(new IndexShardRoutingTable.Builder(shardId) @@ -454,7 +477,7 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide RecoverySource.PeerRecoverySource.INSTANCE, new UnassignedInfo(reason, null, null, failedAllocations, System.nanoTime(), System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT, - Collections.emptySet()))) + Collections.emptySet(), lastAllocatedNodeId))) .build()) ) .build(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java index 14e1ac5fb86d3..79e4cb4f76f8a 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -15,10 +15,13 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; import static org.apache.lucene.util.LuceneTestCase.random; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; +import static org.elasticsearch.test.ESTestCase.randomBoolean; +import static org.elasticsearch.test.ESTestCase.randomFrom; /** * A helper that allows to create shard routing instances within tests, while not requiring to expose @@ -88,7 +91,7 @@ private static RecoverySource buildRecoveryTarget(boolean primary, ShardRoutingS case UNASSIGNED: case INITIALIZING: if (primary) { - return ESTestCase.randomFrom(RecoverySource.EmptyStoreRecoverySource.INSTANCE, + return randomFrom(RecoverySource.EmptyStoreRecoverySource.INSTANCE, RecoverySource.ExistingStoreRecoverySource.INSTANCE); } else { return RecoverySource.PeerRecoverySource.INSTANCE; @@ -120,7 +123,7 @@ private static UnassignedInfo buildUnassignedInfo(ShardRoutingState state) { switch (state) { case UNASSIGNED: case INITIALIZING: - return new UnassignedInfo(ESTestCase.randomFrom(UnassignedInfo.Reason.values()), "auto generated for test"); + return randomUnassignedInfo("auto generated for test"); case STARTED: case RELOCATING: return null; @@ -129,8 +132,33 @@ private static UnassignedInfo buildUnassignedInfo(ShardRoutingState state) { } } + public static UnassignedInfo randomUnassignedInfo(String message) { + UnassignedInfo.Reason reason = randomFrom(UnassignedInfo.Reason.values()); + String lastAllocatedNodeId = null; + boolean delayed = false; + if (reason == UnassignedInfo.Reason.NODE_LEFT || reason == UnassignedInfo.Reason.NODE_RESTARTING) { + if (randomBoolean()) { + delayed = true; + } + lastAllocatedNodeId = randomAlphaOfLength(10); + } + int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0; + return new UnassignedInfo( + reason, + message, + null, + failedAllocations, + System.nanoTime(), + System.currentTimeMillis(), + delayed, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + lastAllocatedNodeId + ); + } + public static RecoverySource randomRecoverySource() { - return ESTestCase.randomFrom(RecoverySource.EmptyStoreRecoverySource.INSTANCE, + return randomFrom(RecoverySource.EmptyStoreRecoverySource.INSTANCE, RecoverySource.ExistingStoreRecoverySource.INSTANCE, RecoverySource.PeerRecoverySource.INSTANCE, RecoverySource.LocalShardsRecoverySource.INSTANCE, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java index 1c27a4f9ea0dd..00fb39ac1b682 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java @@ -244,7 +244,7 @@ public void testExecuteAllocateReplicaUnassigned() { IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)) .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), null, null, false, ShardRoutingState.UNASSIGNED, - new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "the shard is intentionally unassigned"))); + randomUnassignedInfo("the shard is intentionally unassigned"))); CheckShrinkReadyStep step = createRandomInstance(); assertAllocateStatus(index, 1, 1, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, @@ -458,4 +458,29 @@ private void assertAllocateStatus(Index index, int shards, int replicas, CheckSh assertEquals(expectedResult.isComplete(), actualResult.isComplete()); assertEquals(expectedResult.getInfomationContext(), actualResult.getInfomationContext()); } + + public static UnassignedInfo randomUnassignedInfo(String message) { + UnassignedInfo.Reason reason = randomFrom(UnassignedInfo.Reason.values()); + String lastAllocatedNodeId = null; + boolean delayed = false; + if (reason == UnassignedInfo.Reason.NODE_LEFT || reason == UnassignedInfo.Reason.NODE_RESTARTING) { + if (randomBoolean()) { + delayed = true; + } + lastAllocatedNodeId = randomAlphaOfLength(10); + } + int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0; + return new UnassignedInfo( + reason, + message, + null, + failedAllocations, + System.nanoTime(), + System.currentTimeMillis(), + delayed, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + lastAllocatedNodeId + ); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java index 6c321a990c219..57349053e9e11 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java @@ -6,7 +6,6 @@ */ package org.elasticsearch.xpack.core.ilm; - import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -18,8 +17,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.cluster.routing.UnassignedInfo.Reason; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xpack.core.DataTier; @@ -32,6 +29,7 @@ import static java.util.Collections.emptyMap; import static org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider.INDEX_ROUTING_PREFER; +import static org.elasticsearch.xpack.core.ilm.CheckShrinkReadyStepTests.randomUnassignedInfo; import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.waitingForActiveShardsAllocationInfo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -77,7 +75,7 @@ public void testExecuteWithUnassignedShard() { IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)) .addShard(TestShardRouting.newShardRouting(new ShardId(index, 1), null, null, true, ShardRoutingState.UNASSIGNED, - new UnassignedInfo(randomFrom(Reason.values()), "the shard is intentionally unassigned"))); + randomUnassignedInfo("the shard is intentionally unassigned"))); ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build()) diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlNodeShutdownIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlNodeShutdownIT.java index bbc5520dbb595..aebfa48381911 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlNodeShutdownIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlNodeShutdownIT.java @@ -78,9 +78,15 @@ public void testJobsVacateShuttingDownNode() throws Exception { }); // Call the shutdown API for the chosen node. - client().execute(PutShutdownNodeAction.INSTANCE, - new PutShutdownNodeAction.Request(nodeIdToShutdown.get(), randomFrom(SingleNodeShutdownMetadata.Type.values()), "just testing")) - .actionGet(); + client().execute( + PutShutdownNodeAction.INSTANCE, + new PutShutdownNodeAction.Request( + nodeIdToShutdown.get(), + randomFrom(SingleNodeShutdownMetadata.Type.values()), + "just testing", + null + ) + ).actionGet(); // Wait for the desired end state of all 6 jobs running on nodes that are not shutting down. assertBusy(() -> { @@ -145,8 +151,15 @@ public void testCloseJobVacatingShuttingDownNode() throws Exception { }); // Call the shutdown API for the chosen node. - client().execute(PutShutdownNodeAction.INSTANCE, - new PutShutdownNodeAction.Request(nodeIdToShutdown.get(), randomFrom(SingleNodeShutdownMetadata.Type.values()), "just testing")) + client().execute( + PutShutdownNodeAction.INSTANCE, + new PutShutdownNodeAction.Request( + nodeIdToShutdown.get(), + randomFrom(SingleNodeShutdownMetadata.Type.values()), + "just testing", + null + ) + ) .actionGet(); if (randomBoolean()) { diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java index 199fb3f3adc96..2320a154c2a95 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java @@ -47,6 +47,7 @@ import static org.elasticsearch.common.xcontent.XContentHelper.convertToJson; import static org.elasticsearch.common.xcontent.XContentHelper.stripWhitespace; +import static org.elasticsearch.xpack.core.ilm.CheckShrinkReadyStepTests.randomUnassignedInfo; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; @@ -424,8 +425,7 @@ private static IndexRoutingTable mockIndexRoutingTable(final Index index, --unassignedTotal; --unassignedPrimaries; - final UnassignedInfo unassignedInfo = - new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), randomAlphaOfLength(3)); + final UnassignedInfo unassignedInfo = randomUnassignedInfo(randomAlphaOfLength(3)); final String nodeId; final ShardRoutingState state; diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownDelayedAllocationIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownDelayedAllocationIT.java new file mode 100644 index 0000000000000..c89c95d1cf332 --- /dev/null +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownDelayedAllocationIT.java @@ -0,0 +1,235 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.shutdown; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class NodeShutdownDelayedAllocationIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(ShutdownPlugin.class); + } + + public void testShardAllocationIsDelayedForRestartingNode() throws Exception { + internalCluster().startNodes(3); + prepareCreate("test").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0) // Disable "normal" delayed allocation + ).get(); + ensureGreen("test"); + indexRandomData(); + + final String nodeToRestartId = findIdOfNodeWithShard(); + final String nodeToRestartName = findNodeNameFromId(nodeToRestartId); + + // Mark the node for shutdown + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToRestartId, + SingleNodeShutdownMetadata.Type.RESTART, + this.getTestName(), + null // Make sure it works with the default - we'll check this override in other tests + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + internalCluster().restartNode(nodeToRestartName, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertBusy( + () -> { assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); } + ); + return super.onNodeStopped(nodeName); + } + }); + + // And the index should turn green again + ensureGreen("test"); + } + + public void testShardAllocationWillProceedAfterTimeout() throws Exception { + internalCluster().startNodes(3); + prepareCreate("test").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0) // Disable "normal" delayed allocation + ).get(); + ensureGreen("test"); + indexRandomData(); + + final String nodeToRestartId = findIdOfNodeWithShard(); + final String nodeToRestartName = findNodeNameFromId(nodeToRestartId); + + // Mark the node for shutdown + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToRestartId, + SingleNodeShutdownMetadata.Type.RESTART, + this.getTestName(), + TimeValue.timeValueMillis(randomIntBetween(10, 1000)) + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + // Actually stop the node + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeToRestartName)); + + // And the index should turn green again well within the 30-second timeout + ensureGreen("test"); + } + + public void testIndexLevelAllocationDelayWillBeUsedIfLongerThanShutdownDelay() throws Exception { + internalCluster().startNodes(3); + prepareCreate("test").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "3h") // Use a long timeout we definitely won't hit + ).get(); + ensureGreen("test"); + indexRandomData(); + + final String nodeToRestartId = findIdOfNodeWithShard(); + final String nodeToRestartName = findNodeNameFromId(nodeToRestartId); + + // Mark the node for shutdown + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToRestartId, + SingleNodeShutdownMetadata.Type.RESTART, + this.getTestName(), + TimeValue.timeValueMillis(0) // No delay for reallocating these shards, IF this timeout is used. + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + internalCluster().restartNode(nodeToRestartName, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertBusy( + () -> { assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); } + ); + return super.onNodeStopped(nodeName); + } + }); + + // And the index should turn green again + ensureGreen("test"); + } + + public void testShardAllocationTimeoutCanBeChanged() throws Exception { + String nodeToRestartId = setupLongTimeoutTestCase(); + + // Update the timeout on the shutdown request to something shorter + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToRestartId, + SingleNodeShutdownMetadata.Type.RESTART, + this.getTestName(), + TimeValue.timeValueMillis(1) + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + // And the index should turn green again + ensureGreen("test"); + } + + public void testShardAllocationStartsImmediatelyIfShutdownDeleted() throws Exception { + String nodeToRestartId = setupLongTimeoutTestCase(); + + DeleteShutdownNodeAction.Request deleteShutdownRequest = new DeleteShutdownNodeAction.Request(nodeToRestartId); + AcknowledgedResponse deleteShutdownResponse = client().execute(DeleteShutdownNodeAction.INSTANCE, deleteShutdownRequest).get(); + assertTrue(deleteShutdownResponse.isAcknowledged()); + + // And the index should turn green again + ensureGreen("test"); + } + + /** + * Sets up a cluster and an index, picks a random node that has a shard, marks it for shutdown with a long timeout, and then stops the + * node. + * + * @return The ID of the node that was randomly chosen to be marked for shutdown and stopped. + */ + private String setupLongTimeoutTestCase() throws Exception { + internalCluster().startNodes(3); + prepareCreate("test").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0) // Disable "normal" delayed allocation + ).get(); + ensureGreen("test"); + indexRandomData(); + + final String nodeToRestartId = findIdOfNodeWithShard(); + final String nodeToRestartName = findNodeNameFromId(nodeToRestartId); + + { + // Mark the node for shutdown with a delay that we'll never reach in the test + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToRestartId, + SingleNodeShutdownMetadata.Type.RESTART, + this.getTestName(), + TimeValue.timeValueHours(3) + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + } + + // Actually stop the node + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeToRestartName)); + + // Verify that the shard's allocation is delayed + assertBusy(() -> { assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); }); + + return nodeToRestartId; + } + + private void indexRandomData() throws Exception { + int numDocs = scaledRandomIntBetween(100, 1000); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test").setSource("field", "value"); + } + indexRandom(true, builders); + } + + private String findIdOfNodeWithShard() { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + List startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED); + Collections.shuffle(startedShards, random()); + return startedShards.get(0).currentNodeId(); + } + + private String findNodeNameFromId(String id) { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + return state.nodes().get(id).getName(); + } +} diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java index 29325c3626277..f216dcd7562ac 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java @@ -80,7 +80,7 @@ public void testShutdownAwarePlugin() throws Exception { // Mark the node as shutting down client().execute( PutShutdownNodeAction.INSTANCE, - new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing") + new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing", null) ).get(); GetShutdownStatusAction.Response getResp = client().execute( diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java index a55c3dab3ad38..dd50014d6a84d 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java @@ -113,7 +113,7 @@ public void testTasksAreNotAssignedToShuttingDownNode() throws Exception { // Mark the node as shutting down client().execute( PutShutdownNodeAction.INSTANCE, - new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing") + new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing", null) ).get(); // Tell the persistent task executor it can start allocating the task diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java index c3f7081185060..388227e5c6328 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java @@ -18,6 +18,8 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import java.io.IOException; @@ -35,42 +37,49 @@ public static class Request extends AcknowledgedRequest { private final String nodeId; private final SingleNodeShutdownMetadata.Type type; private final String reason; + @Nullable + private final TimeValue allocationDelay; private static final ParseField TYPE_FIELD = new ParseField("type"); - public static final ParseField REASON_FIELD = new ParseField("reason"); + private static final ParseField REASON_FIELD = new ParseField("reason"); + private static final ParseField ALLOCATION_DELAY_FIELD = new ParseField("allocation_delay"); private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "put_node_shutdown_request", false, - (a, nodeId) -> new Request(nodeId, SingleNodeShutdownMetadata.Type.parse((String) a[0]), (String) a[1]) + (a, nodeId) -> new Request(nodeId, SingleNodeShutdownMetadata.Type.parse((String) a[0]), (String) a[1], (TimeValue) a[2]) ); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), TYPE_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), REASON_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ALLOCATION_DELAY_FIELD); } public static Request parseRequest(String nodeId, XContentParser parser) { return PARSER.apply(parser, nodeId); } - public Request(String nodeId, SingleNodeShutdownMetadata.Type type, String reason) { + public Request(String nodeId, SingleNodeShutdownMetadata.Type type, String reason, @Nullable TimeValue allocationDelay) { this.nodeId = nodeId; this.type = type; this.reason = reason; + this.allocationDelay = allocationDelay; } public Request(StreamInput in) throws IOException { this.nodeId = in.readString(); this.type = in.readEnum(SingleNodeShutdownMetadata.Type.class); this.reason = in.readString(); + this.allocationDelay = in.readOptionalTimeValue(); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(this.nodeId); + out.writeString(nodeId); out.writeEnum(type); out.writeString(reason); + out.writeOptionalTimeValue(allocationDelay); } public String getNodeId() { @@ -85,6 +94,10 @@ public String getReason() { return reason; } + public TimeValue getAllocationDelay() { + return allocationDelay; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException arve = new ActionRequestValidationException(); @@ -101,6 +114,10 @@ public ActionRequestValidationException validate() { arve.addValidationError("the reason for shutdown is required"); } + if (allocationDelay != null && SingleNodeShutdownMetadata.Type.RESTART.equals(type) == false) { + arve.addValidationError(ALLOCATION_DELAY_FIELD + " is only allowed for RESTART-type shutdown requests"); + } + if (arve.validationErrors().isEmpty() == false) { return arve; } else { diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java index 19c06341b9003..034c9d0c4e154 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java @@ -87,6 +87,7 @@ public ClusterState execute(ClusterState currentState) { .setType(request.getType()) .setReason(request.getReason()) .setStartedAtMillis(System.currentTimeMillis()) + .setAllocationDelay(request.getAllocationDelay()) .build(); return ClusterState.builder(currentState)