From fe1194d58fdf6e6b87a4b848b68720f271f6f5a0 Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Thu, 17 Sep 2020 15:08:31 +0100 Subject: [PATCH] [7.x] ILM migrate data between tiers (#61377) (#62536) This adds ILM support for automatically migrating the managed indices between data tiers. This proposal makes use of a MigrateAction that is injected (similar to how the Unfollow action is injected) in phases that don't define index allocation rules using the AllocateAction or don't explicitly define the MigrateAction itself (regardless if it's enabled or disabled). (cherry picked from commit c1746afffd61048d0c12d3a77e6d8191a804ed49) Signed-off-by: Andrei Dan --- .../IndexLifecycleNamedXContentProvider.java | 3 + .../indexlifecycle/LifecyclePolicy.java | 7 +- .../client/indexlifecycle/MigrateAction.java | 90 ++++++++ .../client/RestHighLevelClientTests.java | 4 +- .../documentation/ILMDocumentationIT.java | 4 + .../indexlifecycle/MigrateActionTests.java | 46 ++++ docs/reference/ilm/apis/explain.asciidoc | 2 +- .../xpack/core/XPackClientPlugin.java | 2 + .../xpack/core/ilm/AllocationRoutedStep.java | 130 +++-------- .../core/ilm/DataTierMigrationRoutedStep.java | 102 +++++++++ .../xpack/core/ilm/MigrateAction.java | 127 +++++++++++ .../xpack/core/ilm/RolloverAction.java | 2 +- .../core/ilm/TimeseriesLifecycleType.java | 65 +++++- .../core/ilm/step/info/AllocationInfo.java | 120 ++++++++++ .../core/ilm/AllocationRoutedStepTests.java | 19 +- .../ilm/DataTierMigrationRoutedStepTests.java | 206 ++++++++++++++++++ .../ilm/LifecyclePolicyMetadataTests.java | 2 + .../xpack/core/ilm/LifecyclePolicyTests.java | 6 + .../xpack/core/ilm/MigrateActionTests.java | 59 +++++ .../ilm/TimeseriesLifecycleTypeTests.java | 83 ++++++- .../ilm/action/PutLifecycleRequestTests.java | 3 + .../info}/AllocationRoutedStepInfoTests.java | 28 ++- .../ilm/TimeSeriesLifecycleActionsIT.java | 18 +- .../xpack/ilm/DataTiersMigrationsTests.java | 156 +++++++++++++ .../xpack/ilm/IndexLifecycle.java | 5 +- .../ilm/IndexLifecycleMetadataTests.java | 3 + .../xpack/ilm/IndexLifecycleRunnerTests.java | 5 +- .../xpack/ilm/PolicyStepsRegistryTests.java | 5 +- 28 files changed, 1151 insertions(+), 151 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/MigrateAction.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/MigrateActionTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStep.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MigrateAction.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/step/info/AllocationInfo.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MigrateActionTests.java rename x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/{ => step/info}/AllocationRoutedStepInfoTests.java (56%) create mode 100644 x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/DataTiersMigrationsTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/IndexLifecycleNamedXContentProvider.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/IndexLifecycleNamedXContentProvider.java index 90ef9d808997e..466abfa240cbc 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/IndexLifecycleNamedXContentProvider.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/IndexLifecycleNamedXContentProvider.java @@ -57,6 +57,9 @@ public List getNamedXContentParsers() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, + new ParseField(MigrateAction.NAME), + MigrateAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicy.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicy.java index 0e308d8bedfa9..703c6a7ef9499 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicy.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicy.java @@ -58,9 +58,10 @@ public class LifecyclePolicy implements ToXContentObject { }, PHASES_FIELD); ALLOWED_ACTIONS.put("hot", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, RolloverAction.NAME)); - ALLOWED_ACTIONS.put("warm", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME, ForceMergeAction.NAME, - ReadOnlyAction.NAME, ShrinkAction.NAME)); - ALLOWED_ACTIONS.put("cold", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME)); + ALLOWED_ACTIONS.put("warm", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, MigrateAction.NAME, AllocateAction.NAME, + ForceMergeAction.NAME, ReadOnlyAction.NAME, ShrinkAction.NAME)); + ALLOWED_ACTIONS.put("cold", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, MigrateAction.NAME, AllocateAction.NAME, + FreezeAction.NAME)); ALLOWED_ACTIONS.put("delete", Sets.newHashSet(DeleteAction.NAME)); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/MigrateAction.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/MigrateAction.java new file mode 100644 index 0000000000000..aac7a9354ed96 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/MigrateAction.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.indexlifecycle; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +public class MigrateAction implements LifecycleAction, ToXContentObject { + public static final String NAME = "migrate"; + + public static final ParseField ENABLED_FIELD = new ParseField("enabled"); + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + a -> new MigrateAction(a[0] == null ? true : (boolean) a[0])); + + static { + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ENABLED_FIELD); + } + + public static MigrateAction parse(XContentParser parser) { + return PARSER.apply(parser, null); + } + + private final boolean enabled; + + public MigrateAction() { + this(true); + } + + public MigrateAction(boolean enabled) { + this.enabled = enabled; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ENABLED_FIELD.getPreferredName(), enabled); + builder.endObject(); + return builder; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public int hashCode() { + return Objects.hashCode(enabled); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + return true; + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 91051c2503330..77bf8126d5b1d 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -705,7 +705,7 @@ public void testDefaultNamedXContents() { public void testProvidedNamedXContents() { List namedXContents = RestHighLevelClient.getProvidedNamedXContents(); - assertEquals(70, namedXContents.size()); + assertEquals(71, namedXContents.size()); Map, Integer> categories = new HashMap<>(); List names = new ArrayList<>(); for (NamedXContentRegistry.Entry namedXContent : namedXContents) { @@ -731,7 +731,7 @@ public void testProvidedNamedXContents() { assertTrue(names.contains(MeanReciprocalRank.NAME)); assertTrue(names.contains(DiscountedCumulativeGain.NAME)); assertTrue(names.contains(ExpectedReciprocalRank.NAME)); - assertEquals(Integer.valueOf(9), categories.get(LifecycleAction.class)); + assertEquals(Integer.valueOf(10), categories.get(LifecycleAction.class)); assertTrue(names.contains(UnfollowAction.NAME)); assertTrue(names.contains(AllocateAction.NAME)); assertTrue(names.contains(DeleteAction.NAME)); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ILMDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ILMDocumentationIT.java index 2744d34423cef..f29ff65d6a4ba 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ILMDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ILMDocumentationIT.java @@ -362,6 +362,7 @@ public void testExplainLifecycle() throws Exception { CreateIndexRequest createIndexRequest = new CreateIndexRequest("my_index-1") .settings(Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put("index.lifecycle.name", "my_policy") .put("index.lifecycle.rollover_alias", "my_alias") .build()); @@ -370,6 +371,7 @@ public void testExplainLifecycle() throws Exception { CreateIndexRequest createOtherIndexRequest = new CreateIndexRequest("other_index") .settings(Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build()); client.indices().create(createOtherIndexRequest, RequestOptions.DEFAULT); @@ -624,6 +626,7 @@ public void testRetryPolicy() throws Exception { CreateIndexRequest createIndexRequest = new CreateIndexRequest("my_index") .settings(Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put("index.lifecycle.name", "my_policy") .build()); client.indices().create(createIndexRequest, RequestOptions.DEFAULT); @@ -689,6 +692,7 @@ public void testRemovePolicyFromIndex() throws Exception { CreateIndexRequest createIndexRequest = new CreateIndexRequest("my_index") .settings(Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put("index.lifecycle.name", "my_policy") .build()); client.indices().create(createIndexRequest, RequestOptions.DEFAULT); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/MigrateActionTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/MigrateActionTests.java new file mode 100644 index 0000000000000..cb9a16fe108db --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/MigrateActionTests.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.indexlifecycle; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class MigrateActionTests extends AbstractXContentTestCase { + + @Override + protected MigrateAction doParseInstance(XContentParser parser) throws IOException { + return MigrateAction.parse(parser); + } + + @Override + protected MigrateAction createTestInstance() { + return randomInstance(); + } + + static MigrateAction randomInstance() { + return new MigrateAction(randomBoolean()); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} diff --git a/docs/reference/ilm/apis/explain.asciidoc b/docs/reference/ilm/apis/explain.asciidoc index 66984b7ac4a36..17f016a004032 100644 --- a/docs/reference/ilm/apis/explain.asciidoc +++ b/docs/reference/ilm/apis/explain.asciidoc @@ -217,7 +217,7 @@ information for the step that's being performed on the index. "message": "Waiting for all shard copies to be active", "shards_left_to_allocate": -1, "all_shards_active": false, - "actual_replicas": 2 + "number_of_replicas": 2 }, "phase_execution": { "policy": "my_lifecycle3", diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index f6476fbd47d92..cb7c9eb2242f5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -66,6 +66,7 @@ import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; import org.elasticsearch.xpack.core.ilm.LifecycleAction; import org.elasticsearch.xpack.core.ilm.LifecycleType; +import org.elasticsearch.xpack.core.ilm.MigrateAction; import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction; @@ -623,6 +624,7 @@ public List getNamedWriteables() { new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new), + new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new), // Transforms new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.TRANSFORM, TransformFeatureSetUsage::new), new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TransformField.TASK_NAME, TransformTaskParams::new), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStep.java index fa3257d3f9c12..32be0f6ca679c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStep.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.core.ilm; import com.carrotsearch.hppc.cursors.ObjectCursor; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.support.ActiveShardCount; @@ -18,19 +17,15 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.Index; -import java.io.IOException; import java.util.Collections; -import java.util.Objects; + +import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.allShardsActiveAllocationInfo; +import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.waitingForActiveShardsAllocationInfo; /** * Checks whether all shards have been correctly routed in response to an update to the allocation rules for an index. @@ -41,7 +36,7 @@ public class AllocationRoutedStep extends ClusterStateWaitStep { private static final Logger logger = LogManager.getLogger(AllocationRoutedStep.class); private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(Collections.singletonList( - new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)))); + new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)))); AllocationRoutedStep(StepKey key, StepKey nextStepKey) { super(key, nextStepKey); @@ -57,13 +52,26 @@ public Result isConditionMet(Index index, ClusterState clusterState) { } if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) { logger.debug("[{}] lifecycle action for index [{}] cannot make progress because not all shards are active", - getKey().getAction(), index.getName()); - return new Result(false, new Info(idxMeta.getNumberOfReplicas(), -1, false)); + getKey().getAction(), index.getName()); + return new Result(false, waitingForActiveShardsAllocationInfo(idxMeta.getNumberOfReplicas())); } + int allocationPendingAllShards = getPendingAllocations(index, ALLOCATION_DECIDERS, clusterState); + + if (allocationPendingAllShards > 0) { + logger.debug("{} lifecycle action [{}] waiting for [{}] shards to be allocated to nodes matching the given filters", + index, getKey().getAction(), allocationPendingAllShards); + return new Result(false, allShardsActiveAllocationInfo(idxMeta.getNumberOfReplicas(), allocationPendingAllShards)); + } else { + logger.debug("{} lifecycle action for [{}] complete", index, getKey().getAction()); + return new Result(true, null); + } + } + + static int getPendingAllocations(Index index, AllocationDeciders allocationDeciders, ClusterState clusterState) { // All the allocation attributes are already set so just need to check // if the allocation has happened - RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, clusterState.getRoutingNodes(), clusterState, null, - System.nanoTime()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, clusterState.getRoutingNodes(), clusterState, null, + System.nanoTime()); int allocationPendingAllShards = 0; @@ -71,23 +79,15 @@ public Result isConditionMet(Index index, ClusterState clusterState) { for (ObjectCursor shardRoutingTable : allShards.values()) { for (ShardRouting shardRouting : shardRoutingTable.value.shards()) { String currentNodeId = shardRouting.currentNodeId(); - boolean canRemainOnCurrentNode = ALLOCATION_DECIDERS - .canRemain(shardRouting, clusterState.getRoutingNodes().node(currentNodeId), allocation) - .type() == Decision.Type.YES; + boolean canRemainOnCurrentNode = allocationDeciders + .canRemain(shardRouting, clusterState.getRoutingNodes().node(currentNodeId), allocation) + .type() == Decision.Type.YES; if (canRemainOnCurrentNode == false || shardRouting.started() == false) { allocationPendingAllShards++; } } } - - if (allocationPendingAllShards > 0) { - logger.debug("{} lifecycle action [{}] waiting for [{}] shards to be allocated to nodes matching the given filters", - index, getKey().getAction(), allocationPendingAllShards); - return new Result(false, new Info(idxMeta.getNumberOfReplicas(), allocationPendingAllShards, true)); - } else { - logger.debug("{} lifecycle action for [{}] complete", index, getKey().getAction()); - return new Result(true, null); - } + return allocationPendingAllShards; } @Override @@ -105,84 +105,4 @@ public boolean equals(Object obj) { } return super.equals(obj); } - - public static final class Info implements ToXContentObject { - - private final long actualReplicas; - private final long numberShardsLeftToAllocate; - private final boolean allShardsActive; - private final String message; - - static final ParseField ACTUAL_REPLICAS = new ParseField("actual_replicas"); - static final ParseField SHARDS_TO_ALLOCATE = new ParseField("shards_left_to_allocate"); - static final ParseField ALL_SHARDS_ACTIVE = new ParseField("all_shards_active"); - static final ParseField MESSAGE = new ParseField("message"); - static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("allocation_routed_step_info", - a -> new Info((long) a[0], (long) a[1], (boolean) a[2])); - static { - PARSER.declareLong(ConstructingObjectParser.constructorArg(), ACTUAL_REPLICAS); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_ALLOCATE); - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ALL_SHARDS_ACTIVE); - PARSER.declareString((i, s) -> {}, MESSAGE); - } - - public Info(long actualReplicas, long numberShardsLeftToAllocate, boolean allShardsActive) { - this.actualReplicas = actualReplicas; - this.numberShardsLeftToAllocate = numberShardsLeftToAllocate; - this.allShardsActive = allShardsActive; - if (allShardsActive == false) { - message = "Waiting for all shard copies to be active"; - } else { - message = "Waiting for [" + numberShardsLeftToAllocate + "] shards " - + "to be allocated to nodes matching the given filters"; - } - } - - public long getActualReplicas() { - return actualReplicas; - } - - public long getNumberShardsLeftToAllocate() { - return numberShardsLeftToAllocate; - } - - public boolean allShardsActive() { - return allShardsActive; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(MESSAGE.getPreferredName(), message); - builder.field(SHARDS_TO_ALLOCATE.getPreferredName(), numberShardsLeftToAllocate); - builder.field(ALL_SHARDS_ACTIVE.getPreferredName(), allShardsActive); - builder.field(ACTUAL_REPLICAS.getPreferredName(), actualReplicas); - builder.endObject(); - return builder; - } - - @Override - public int hashCode() { - return Objects.hash(actualReplicas, numberShardsLeftToAllocate, allShardsActive); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - Info other = (Info) obj; - return Objects.equals(actualReplicas, other.actualReplicas) && - Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) && - Objects.equals(allShardsActive, other.allShardsActive); - } - - @Override - public String toString() { - return Strings.toString(this); - } - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStep.java new file mode 100644 index 0000000000000..7d451c042c0eb --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStep.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ilm; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; +import org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo; + +import java.util.HashSet; +import java.util.Locale; +import java.util.Set; + +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE; +import static org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider.INDEX_ROUTING_INCLUDE_SETTING; +import static org.elasticsearch.xpack.core.ilm.AllocationRoutedStep.getPendingAllocations; + +/** + * Checks whether all shards have been correctly routed in response to updating the allocation rules for an index in order + * to migrate the index to a new tier. + */ +public class DataTierMigrationRoutedStep extends ClusterStateWaitStep { + public static final String NAME = "check-migration"; + + private static final Logger logger = LogManager.getLogger(DataTierMigrationRoutedStep.class); + + private static final Set> ALL_CLUSTER_SETTINGS; + + static { + Set> allSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_REQUIRE_SETTING); + allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_INCLUDE_SETTING); + allSettings.add(DataTierAllocationDecider.CLUSTER_ROUTING_EXCLUDE_SETTING); + ALL_CLUSTER_SETTINGS = allSettings; + } + + private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders( + org.elasticsearch.common.collect.List.of( + new DataTierAllocationDecider(new ClusterSettings(Settings.EMPTY, ALL_CLUSTER_SETTINGS)) + ) + ); + + DataTierMigrationRoutedStep(StepKey key, StepKey nextStepKey) { + super(key, nextStepKey); + } + + @Override + public boolean isRetryable() { + return true; + } + + @Override + public Result isConditionMet(Index index, ClusterState clusterState) { + IndexMetadata idxMeta = clusterState.metadata().index(index); + if (idxMeta == null) { + // Index must have been since deleted, ignore it + logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), index.getName()); + return new Result(false, null); + } + String destinationTier = INDEX_ROUTING_INCLUDE_SETTING.get(idxMeta.getSettings()); + if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) { + logger.debug("[{}] migration of index [{}] to the [{}] tier cannot progress, as not all shards are active", + getKey().getAction(), index.getName(), destinationTier); + return new Result(false, AllocationInfo.waitingForActiveShardsAllocationInfo(idxMeta.getNumberOfReplicas())); + } + + int allocationPendingAllShards = getPendingAllocations(index, ALLOCATION_DECIDERS, clusterState); + + if (allocationPendingAllShards > 0) { + boolean targetTierNodeFound = false; + for (DiscoveryNode node : clusterState.nodes()) { + for (DiscoveryNodeRole role : node.getRoles()) { + if (role.roleName().equals(DATA_ROLE.roleName()) || role.roleName().equals(destinationTier)) { + targetTierNodeFound = true; + break; + } + } + } + String statusMessage = String.format(Locale.ROOT, "%s lifecycle action [%s] waiting for [%s] shards to be moved to the [%s] " + + "tier" + (targetTierNodeFound ? "" : " but there are currently no [%s] nodes in the cluster"), + index, getKey().getAction(), allocationPendingAllShards, destinationTier, destinationTier); + logger.debug(statusMessage); + return new Result(false, new AllocationInfo(idxMeta.getNumberOfReplicas(), allocationPendingAllShards, true, statusMessage)); + } else { + logger.debug("[{}] migration of index [{}] to tier [{}] complete", getKey().getAction(), index, destinationTier); + return new Result(true, null); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MigrateAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MigrateAction.java new file mode 100644 index 0000000000000..4845c62da9949 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MigrateAction.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ilm; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; +import org.elasticsearch.xpack.core.DataTier; +import org.elasticsearch.xpack.core.ilm.Step.StepKey; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * A {@link LifecycleAction} which enables or disables the automatic migration of data between + * {@link org.elasticsearch.xpack.core.DataTier}s. + */ +public class MigrateAction implements LifecycleAction { + public static final String NAME = "migrate"; + public static final ParseField ENABLED_FIELD = new ParseField("enabled"); + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + a -> new MigrateAction(a[0] == null ? true : (boolean) a[0])); + + static { + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ENABLED_FIELD); + } + + private final boolean enabled; + + public static MigrateAction parse(XContentParser parser) { + return PARSER.apply(parser, null); + } + + public MigrateAction() { + this(true); + } + + public MigrateAction(boolean enabled) { + this.enabled = enabled; + } + + public MigrateAction(StreamInput in) throws IOException { + this(in.readBoolean()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(enabled); + } + + @Override + public String getWriteableName() { + return NAME; + } + + public boolean isEnabled() { + return enabled; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ENABLED_FIELD.getPreferredName(), enabled); + builder.endObject(); + return builder; + } + + @Override + public boolean isSafeAction() { + return true; + } + + @Override + public List toSteps(Client client, String phase, StepKey nextStepKey) { + if (enabled) { + StepKey migrationKey = new StepKey(phase, NAME, NAME); + StepKey migrationRoutedKey = new StepKey(phase, NAME, DataTierMigrationRoutedStep.NAME); + + Settings.Builder migrationSettings = Settings.builder(); + String dataTierName = "data_" + phase; + assert DataTier.validTierName(dataTierName) : "invalid data tier name:" + dataTierName; + migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_INCLUDE, dataTierName); + UpdateSettingsStep updateMigrationSettingStep = new UpdateSettingsStep(migrationKey, migrationRoutedKey, client, + migrationSettings.build()); + DataTierMigrationRoutedStep migrationRoutedStep = new DataTierMigrationRoutedStep(migrationRoutedKey, nextStepKey); + return Arrays.asList(updateMigrationSettingStep, migrationRoutedStep); + } else { + return org.elasticsearch.common.collect.List.of(); + } + } + + @Override + public int hashCode() { + return Objects.hash(enabled); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + MigrateAction other = (MigrateAction) obj; + return Objects.equals(enabled, other.enabled); + } + + @Override + public String toString() { + return Strings.toString(this); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverAction.java index bf1cd9e2b9fd8..6fa1114817130 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverAction.java @@ -27,7 +27,7 @@ import java.util.Objects; /** - * A {@link LifecycleAction} which deletes the index. + * A {@link LifecycleAction} which rolls over the index. */ public class RolloverAction implements LifecycleAction { public static final String NAME = "rollover"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java index c23143aecda0d..2360141f189e2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ilm; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.set.Sets; @@ -19,6 +20,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static java.util.stream.Collectors.toList; + /** * Represents the lifecycle of an index from creation to deletion. A * {@link TimeseriesLifecycleType} is made up of a set of {@link Phase}s which it will @@ -40,9 +43,9 @@ public class TimeseriesLifecycleType implements LifecycleType { static final List ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME, ForceMergeAction.NAME); static final List ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME, - AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME); + AllocateAction.NAME, MigrateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME); static final List ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME, - FreezeAction.NAME, SearchableSnapshotAction.NAME); + MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME); static final List ORDERED_VALID_FROZEN_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME); static final List ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(WaitForSnapshotAction.NAME, DeleteAction.NAME); @@ -51,7 +54,7 @@ public class TimeseriesLifecycleType implements LifecycleType { static final Set VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS); static final Set VALID_FROZEN_ACTIONS = Sets.newHashSet(ORDERED_VALID_FROZEN_ACTIONS); static final Set VALID_DELETE_ACTIONS = Sets.newHashSet(ORDERED_VALID_DELETE_ACTIONS); - private static Map> ALLOWED_ACTIONS = new HashMap<>(); + private static final Map> ALLOWED_ACTIONS = new HashMap<>(); static { ALLOWED_ACTIONS.put(HOT_PHASE, VALID_HOT_ACTIONS); @@ -86,12 +89,33 @@ public List getOrderedPhases(Map phases) { actionMap.put(UnfollowAction.NAME, new UnfollowAction()); phase = new Phase(phase.getName(), phase.getMinimumAge(), actionMap); } + + if (shouldInjectMigrateStepForPhase(phase)) { + Map actionMap = new HashMap<>(phase.getActions()); + actionMap.put(MigrateAction.NAME, new MigrateAction(true)); + phase = new Phase(phase.getName(), phase.getMinimumAge(), actionMap); + } + orderedPhases.add(phase); } } return orderedPhases; } + static boolean shouldInjectMigrateStepForPhase(Phase phase) { + AllocateAction allocateAction = (AllocateAction) phase.getActions().get(AllocateAction.NAME); + if (allocateAction != null) { + if (definesAllocationRules(allocateAction)) { + // we won't automatically migrate the data if an allocate action that defines any allocation rule is present + return false; + } + } + + MigrateAction migrateAction = (MigrateAction) phase.getActions().get(MigrateAction.NAME); + // if the user configured the {@link MigrateAction} already we won't automatically configure it + return migrateAction == null; + } + @Override public String getNextPhaseName(String currentPhaseName, Map phases) { int index = VALID_PHASES.indexOf(currentPhaseName); @@ -121,7 +145,7 @@ public String getPreviousPhaseName(String currentPhaseName, Map p throw new IllegalArgumentException("[" + currentPhaseName + "] is not a valid phase for lifecycle type [" + TYPE + "]"); } else { // Find the previous phase before `index` that exists in `phases` and return it - while (--index >=0) { + while (--index >= 0) { String phaseName = VALID_PHASES.get(index); if (phases.containsKey(phaseName)) { return phaseName; @@ -139,19 +163,19 @@ public List getOrderedActions(Phase phase) { switch (phase.getName()) { case HOT_PHASE: return ORDERED_VALID_HOT_ACTIONS.stream().map(a -> actions.getOrDefault(a, null)) - .filter(Objects::nonNull).collect(Collectors.toList()); + .filter(Objects::nonNull).collect(toList()); case WARM_PHASE: - return ORDERED_VALID_WARM_ACTIONS.stream() .map(a -> actions.getOrDefault(a, null)) - .filter(Objects::nonNull).collect(Collectors.toList()); + return ORDERED_VALID_WARM_ACTIONS.stream().map(a -> actions.getOrDefault(a, null)) + .filter(Objects::nonNull).collect(toList()); case COLD_PHASE: return ORDERED_VALID_COLD_ACTIONS.stream().map(a -> actions.getOrDefault(a, null)) - .filter(Objects::nonNull).collect(Collectors.toList()); + .filter(Objects::nonNull).collect(toList()); case FROZEN_PHASE: return ORDERED_VALID_FROZEN_ACTIONS.stream().map(a -> actions.getOrDefault(a, null)) .filter(Objects::nonNull).collect(Collectors.toList()); case DELETE_PHASE: return ORDERED_VALID_DELETE_ACTIONS.stream().map(a -> actions.getOrDefault(a, null)) - .filter(Objects::nonNull).collect(Collectors.toList()); + .filter(Objects::nonNull).collect(toList()); default: throw new IllegalArgumentException("lifecycle type[" + TYPE + "] does not support phase[" + phase.getName() + "]"); } @@ -183,7 +207,7 @@ public String getNextActionName(String currentActionName, Phase phase) { int index = orderedActionNames.indexOf(currentActionName); if (index < 0) { throw new IllegalArgumentException("[" + currentActionName + "] is not a valid action for phase [" + phase.getName() - + "] in lifecycle type [" + TYPE + "]"); + + "] in lifecycle type [" + TYPE + "]"); } else { // Find the next action after `index` that exists in the phase and return it while (++index < orderedActionNames.size()) { @@ -208,7 +232,7 @@ public void validate(Collection phases) { phase.getActions().forEach((actionName, action) -> { if (ALLOWED_ACTIONS.get(phase.getName()).contains(actionName) == false) { throw new IllegalArgumentException("invalid action [" + actionName + "] " + - "defined in phase [" + phase.getName() +"]"); + "defined in phase [" + phase.getName() + "]"); } }); }); @@ -226,5 +250,24 @@ public void validate(Collection phases) { "] action may not be used in the [" + HOT_PHASE + "] phase without an accompanying [" + RolloverAction.NAME + "] action"); } + + // look for phases that have the migrate action enabled and also specify allocation rules via the AllocateAction + String phasesWithConflictingMigrationActions = phases.stream() + .filter(phase -> phase.getActions().containsKey(MigrateAction.NAME) && + ((MigrateAction) phase.getActions().get(MigrateAction.NAME)).isEnabled() && + phase.getActions().containsKey(AllocateAction.NAME) && + definesAllocationRules((AllocateAction) phase.getActions().get(AllocateAction.NAME)) + ) + .map(Phase::getName) + .collect(Collectors.joining(",")); + if (Strings.hasText(phasesWithConflictingMigrationActions)) { + throw new IllegalArgumentException("phases [" + phasesWithConflictingMigrationActions + "] specify an enabled " + + MigrateAction.NAME + " action and an " + AllocateAction.NAME + " action with allocation rules. specify only a single " + + "data migration in each phase"); + } + } + + private static boolean definesAllocationRules(AllocateAction action) { + return action.getRequire().isEmpty() == false || action.getInclude().isEmpty() == false || action.getExclude().isEmpty() == false; } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/step/info/AllocationInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/step/info/AllocationInfo.java new file mode 100644 index 0000000000000..4ee790658f579 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/step/info/AllocationInfo.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ilm.step.info; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Represents the state of an index's shards allocation, including a user friendly message describing the current state. + * It allows to transfer the allocation information to {@link org.elasticsearch.common.xcontent.XContent} using + * {@link #toXContent(XContentBuilder, Params)} + */ +public class AllocationInfo implements ToXContentObject { + + private final long numberOfReplicas; + private final long numberShardsLeftToAllocate; + private final boolean allShardsActive; + private final String message; + + static final ParseField NUMBER_OF_REPLICAS = new ParseField("number_of_replicas"); + static final ParseField SHARDS_TO_ALLOCATE = new ParseField("shards_left_to_allocate"); + static final ParseField ALL_SHARDS_ACTIVE = new ParseField("all_shards_active"); + static final ParseField MESSAGE = new ParseField("message"); + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("allocation_routed_step_info", + a -> new AllocationInfo((long) a[0], (long) a[1], (boolean) a[2], (String) a[3])); + + static { + PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_REPLICAS); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_ALLOCATE); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ALL_SHARDS_ACTIVE); + PARSER.declareString(ConstructingObjectParser.constructorArg(), MESSAGE); + } + + public AllocationInfo(long numberOfReplicas, long numberShardsLeftToAllocate, boolean allShardsActive, String message) { + this.numberOfReplicas = numberOfReplicas; + this.numberShardsLeftToAllocate = numberShardsLeftToAllocate; + this.allShardsActive = allShardsActive; + this.message = message; + } + + /** + * Builds the AllocationInfo representing a cluster state with a routing table that does not have enough shards active for a + * particular index. + */ + public static AllocationInfo waitingForActiveShardsAllocationInfo(long numReplicas) { + return new AllocationInfo(numReplicas, -1, false, + "Waiting for all shard copies to be active"); + } + + /** + * Builds the AllocationInfo representing a cluster state with a routing table that has all the shards active for a particular index + * but there are still {@link #numberShardsLeftToAllocate} left to be allocated. + */ + public static AllocationInfo allShardsActiveAllocationInfo(long numReplicas, long numberShardsLeftToAllocate) { + return new AllocationInfo(numReplicas, numberShardsLeftToAllocate, true, "Waiting for [" + numberShardsLeftToAllocate + + "] shards to be allocated to nodes matching the given filters"); + } + + public long getNumberOfReplicas() { + return numberOfReplicas; + } + + public long getNumberShardsLeftToAllocate() { + return numberShardsLeftToAllocate; + } + + public boolean allShardsActive() { + return allShardsActive; + } + + public String getMessage() { + return message; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(MESSAGE.getPreferredName(), message); + builder.field(SHARDS_TO_ALLOCATE.getPreferredName(), numberShardsLeftToAllocate); + builder.field(ALL_SHARDS_ACTIVE.getPreferredName(), allShardsActive); + builder.field(NUMBER_OF_REPLICAS.getPreferredName(), numberOfReplicas); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(numberOfReplicas, numberShardsLeftToAllocate, allShardsActive); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + AllocationInfo other = (AllocationInfo) obj; + return Objects.equals(numberOfReplicas, other.numberOfReplicas) && + Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate) && + Objects.equals(message, other.message) && + Objects.equals(allShardsActive, other.allShardsActive); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStepTests.java index 2061514aaafcf..62512f9b91d08 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStepTests.java @@ -31,6 +31,9 @@ import java.util.Collections; import java.util.Map; +import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.allShardsActiveAllocationInfo; +import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.waitingForActiveShardsAllocationInfo; + public class AllocationRoutedStepTests extends AbstractStepTestCase { @Override @@ -117,7 +120,7 @@ public void testRequireConditionMetOnlyOneCopyAllocated() { AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey()); assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, Settings.builder(), indexRoutingTable, - new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 1, true))); + new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 1))); } public void testExcludeConditionMetOnlyOneCopyAllocated() { @@ -139,7 +142,7 @@ public void testExcludeConditionMetOnlyOneCopyAllocated() { AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey()); assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, Settings.builder(), indexRoutingTable, - new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 1, true))); + new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 1))); } public void testIncludeConditionMetOnlyOneCopyAllocated() { @@ -161,7 +164,7 @@ public void testIncludeConditionMetOnlyOneCopyAllocated() { AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey()); assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, Settings.builder(), indexRoutingTable, - new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 1, true))); + new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 1))); } public void testConditionNotMetDueToRelocation() { @@ -190,7 +193,7 @@ public void testConditionNotMetDueToRelocation() { AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey()); assertAllocateStatus(index, 1, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, - new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 2, true))); + new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 2))); } public void testExecuteAllocateNotComplete() throws Exception { @@ -227,7 +230,7 @@ public void testExecuteAllocateNotComplete() throws Exception { AllocationRoutedStep step = createRandomInstance(); assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, - new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 1, true))); + new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 1))); } public void testExecuteAllocateNotCompleteOnlyOneCopyAllocated() throws Exception { @@ -266,7 +269,7 @@ public void testExecuteAllocateNotCompleteOnlyOneCopyAllocated() throws Exceptio AllocationRoutedStep step = new AllocationRoutedStep(randomStepKey(), randomStepKey()); assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, - new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, 1, true))); + new ClusterStateWaitStep.Result(false, allShardsActiveAllocationInfo(0, 1))); } public void testExecuteAllocateUnassigned() throws Exception { @@ -304,7 +307,7 @@ public void testExecuteAllocateUnassigned() throws Exception { AllocationRoutedStep step = createRandomInstance(); assertAllocateStatus(index, 2, 0, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, - new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(0, -1, false))); + new ClusterStateWaitStep.Result(false, waitingForActiveShardsAllocationInfo(0))); } /** @@ -343,7 +346,7 @@ public void testExecuteReplicasNotAllocatedOnSingleNode() { AllocationRoutedStep step = createRandomInstance(); assertAllocateStatus(index, 1, 1, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, - new ClusterStateWaitStep.Result(false, new AllocationRoutedStep.Info(1, -1, false))); + new ClusterStateWaitStep.Result(false, waitingForActiveShardsAllocationInfo(1))); } public void testExecuteIndexMissing() throws Exception { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java new file mode 100644 index 0000000000000..13ae30adba4f6 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java @@ -0,0 +1,206 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ilm; + + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.UnassignedInfo.Reason; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.xpack.core.DataTier; +import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep.Result; +import org.elasticsearch.xpack.core.ilm.Step.StepKey; +import org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo; + +import java.util.Collections; +import java.util.Set; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider.INDEX_ROUTING_INCLUDE_SETTING; +import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.waitingForActiveShardsAllocationInfo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class DataTierMigrationRoutedStepTests extends AbstractStepTestCase { + + @Override + public DataTierMigrationRoutedStep createRandomInstance() { + StepKey stepKey = randomStepKey(); + StepKey nextStepKey = randomStepKey(); + + return new DataTierMigrationRoutedStep(stepKey, nextStepKey); + } + + @Override + public DataTierMigrationRoutedStep mutateInstance(DataTierMigrationRoutedStep instance) { + StepKey key = instance.getKey(); + StepKey nextKey = instance.getNextStepKey(); + + switch (between(0, 1)) { + case 0: + key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + break; + case 1: + nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + break; + default: + throw new AssertionError("Illegal randomisation branch"); + } + + return new DataTierMigrationRoutedStep(key, nextKey); + } + + @Override + public DataTierMigrationRoutedStep copyInstance(DataTierMigrationRoutedStep instance) { + return new DataTierMigrationRoutedStep(instance.getKey(), instance.getNextStepKey()); + } + + public void testExecuteWithUnassignedShard() { + IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10)).settings(settings(Version.CURRENT)) + .numberOfShards(1).numberOfReplicas(1).build(); + Index index = indexMetadata.getIndex(); + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 1), null, null, true, ShardRoutingState.UNASSIGNED, + new UnassignedInfo(randomFrom(Reason.values()), "the shard is intentionally unassigned"))); + + ClusterState clusterState = + ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build()) + .nodes(DiscoveryNodes.builder() + .add(newNode("node1", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE))) + ) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); + DataTierMigrationRoutedStep step = createRandomInstance(); + Result expectedResult = new Result(false, waitingForActiveShardsAllocationInfo(1)); + + Result actualResult = step.isConditionMet(index, clusterState); + assertThat(actualResult.isComplete(), is(false)); + assertThat(actualResult.getInfomationContext(), is(expectedResult.getInfomationContext())); + } + + public void testExecuteWithPendingShards() { + IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10)) + .settings(settings(Version.CURRENT).put(INDEX_ROUTING_INCLUDE_SETTING.getKey(), DataTier.DATA_WARM)) + .numberOfShards(1).numberOfReplicas(0).build(); + Index index = indexMetadata.getIndex(); + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)); + + ClusterState clusterState = + ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build()) + .nodes(DiscoveryNodes.builder() + .add(newNode("node1", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE))) + .add(newNode("node2", Collections.singleton(DataTier.DATA_WARM_NODE_ROLE))) + ) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); + DataTierMigrationRoutedStep step = createRandomInstance(); + Result expectedResult = new Result(false, new AllocationInfo(0, 1, true, + "[" + index.getName() + "] lifecycle action [" + step.getKey().getAction() + "] waiting for " + + "[1] shards to be moved to the [data_warm] tier") + ); + + Result actualResult = step.isConditionMet(index, clusterState); + assertThat(actualResult.isComplete(), is(false)); + assertThat(actualResult.getInfomationContext(), is(expectedResult.getInfomationContext())); + } + + public void testExecuteWithPendingShardsAndTargetRoleNotPresentInCluster() { + IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10)) + .settings(settings(Version.CURRENT).put(INDEX_ROUTING_INCLUDE_SETTING.getKey(), DataTier.DATA_WARM)) + .numberOfShards(1).numberOfReplicas(0).build(); + Index index = indexMetadata.getIndex(); + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)); + + ClusterState clusterState = + ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build()) + .nodes(DiscoveryNodes.builder() + .add(newNode("node1", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE))) + ) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); + DataTierMigrationRoutedStep step = createRandomInstance(); + Result expectedResult = new Result(false, new AllocationInfo(0, 1, true, + "[" + index.getName() + "] lifecycle action [" + step.getKey().getAction() + "] waiting for " + + "[1] shards to be moved to the [data_warm] tier but there are currently no [data_warm] nodes in the cluster") + ); + + Result actualResult = step.isConditionMet(index, clusterState); + assertThat(actualResult.isComplete(), is(false)); + assertThat(actualResult.getInfomationContext(), is(expectedResult.getInfomationContext())); + } + + public void testExecuteIndexMissing() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).build(); + + DataTierMigrationRoutedStep step = createRandomInstance(); + + Result actualResult = step.isConditionMet(index, clusterState); + assertThat(actualResult.isComplete(), is(false)); + assertThat(actualResult.getInfomationContext(), is(nullValue())); + } + + public void testExecuteIsComplete() { + IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10)) + .settings(settings(Version.CURRENT).put(INDEX_ROUTING_INCLUDE_SETTING.getKey(), DataTier.DATA_WARM)) + .numberOfShards(1).numberOfReplicas(0).build(); + Index index = indexMetadata.getIndex(); + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node2", true, ShardRoutingState.STARTED)); + + ClusterState clusterState = + ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build()) + .nodes(DiscoveryNodes.builder() + .add(newNode("node1", Collections.singleton(DataTier.DATA_HOT_NODE_ROLE))) + .add(newNode("node2", Collections.singleton(DataTier.DATA_WARM_NODE_ROLE))) + ) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); + DataTierMigrationRoutedStep step = createRandomInstance(); + Result result = step.isConditionMet(index, clusterState); + assertThat(result.isComplete(), is(true)); + assertThat(result.getInfomationContext(), is(nullValue())); + } + + public void testExecuteWithGenericDataNodes() { + IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 10)) + .settings(settings(Version.CURRENT).put(INDEX_ROUTING_INCLUDE_SETTING.getKey(), DataTier.DATA_WARM)) + .numberOfShards(1).numberOfReplicas(0).build(); + Index index = indexMetadata.getIndex(); + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)); + + ClusterState clusterState = + ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build()) + .nodes(DiscoveryNodes.builder() + .add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + ) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); + DataTierMigrationRoutedStep step = createRandomInstance(); + Result result = step.isConditionMet(index, clusterState); + assertThat(result.isComplete(), is(true)); + assertThat(result.getInfomationContext(), is(nullValue())); + } + + private DiscoveryNode newNode(String nodeId, Set roles) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyMetadataTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyMetadataTests.java index 684b32a19a277..2b048b51d743b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyMetadataTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyMetadataTests.java @@ -48,6 +48,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new), + new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new) )); } @@ -70,6 +71,7 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) )); return new NamedXContentRegistry(entries); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyTests.java index 5e9142c0520ba..97b6ee601fa2f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyTests.java @@ -57,6 +57,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new), + new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new) )); } @@ -78,6 +79,7 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME), SearchableSnapshotAction::parse) )); @@ -136,6 +138,8 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicyWithAllPhases(@Null return new UnfollowAction(); case SearchableSnapshotAction.NAME: return new SearchableSnapshotAction(randomAlphaOfLengthBetween(1, 10)); + case MigrateAction.NAME: + return new MigrateAction(false); default: throw new IllegalArgumentException("invalid action [" + action + "]"); }}; @@ -194,6 +198,8 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String l return new UnfollowAction(); case SearchableSnapshotAction.NAME: return new SearchableSnapshotAction(randomAlphaOfLengthBetween(1, 10)); + case MigrateAction.NAME: + return new MigrateAction(false); default: throw new IllegalArgumentException("invalid action [" + action + "]"); }}; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MigrateActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MigrateActionTests.java new file mode 100644 index 0000000000000..67bdb7f03e795 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MigrateActionTests.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ilm; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ilm.Step.StepKey; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.DELETE_PHASE; + +public class MigrateActionTests extends AbstractActionTestCase { + + @Override + protected MigrateAction doParseInstance(XContentParser parser) throws IOException { + return MigrateAction.parse(parser); + } + + @Override + protected MigrateAction createTestInstance() { + return new MigrateAction(); + } + + @Override + protected Reader instanceReader() { + return MigrateAction::new; + } + + public void testToSteps() { + String phase = randomValueOtherThan(DELETE_PHASE, () -> randomFrom(TimeseriesLifecycleType.VALID_PHASES)); + StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), + randomAlphaOfLengthBetween(1, 10)); + { + MigrateAction action = new MigrateAction(); + List steps = action.toSteps(null, phase, nextStepKey); + assertNotNull(steps); + assertEquals(2, steps.size()); + StepKey expectedFirstStepKey = new StepKey(phase, MigrateAction.NAME, MigrateAction.NAME); + StepKey expectedSecondStepKey = new StepKey(phase, MigrateAction.NAME, DataTierMigrationRoutedStep.NAME); + UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0); + DataTierMigrationRoutedStep secondStep = (DataTierMigrationRoutedStep) steps.get(1); + assertEquals(expectedFirstStepKey, firstStep.getKey()); + assertEquals(expectedSecondStepKey, firstStep.getNextStepKey()); + assertEquals(expectedSecondStepKey, secondStep.getKey()); + assertEquals(nextStepKey, secondStep.getNextStepKey()); + } + + { + MigrateAction disabledMigrateAction = new MigrateAction(false); + List steps = disabledMigrateAction.toSteps(null, phase, nextStepKey); + assertEquals(0, steps.size()); + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java index 9096bf242bfc2..df88454110257 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java @@ -20,6 +20,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.COLD_PHASE; +import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.HOT_PHASE; import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.ORDERED_VALID_COLD_ACTIONS; import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.ORDERED_VALID_DELETE_ACTIONS; import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.ORDERED_VALID_FROZEN_ACTIONS; @@ -31,8 +33,11 @@ import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.VALID_HOT_ACTIONS; import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.VALID_PHASES; import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.VALID_WARM_ACTIONS; +import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.WARM_PHASE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; public class TimeseriesLifecycleTypeTests extends ESTestCase { @@ -48,6 +53,9 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase { private static final SetPriorityAction TEST_PRIORITY_ACTION = new SetPriorityAction(0); private static final UnfollowAction TEST_UNFOLLOW_ACTION = new UnfollowAction(); private static final SearchableSnapshotAction TEST_SEARCHABLE_SNAPSHOT_ACTION = new SearchableSnapshotAction("repo"); + // keeping the migrate action disabled as otherwise it could conflict with the allocate action if both are randomly selected for the + // same phase + private static final MigrateAction TEST_MIGRATE_ACTION = new MigrateAction(false); public void testValidatePhases() { boolean invalid = randomBoolean(); @@ -186,6 +194,28 @@ public void testValidateDeletePhase() { } } + public void testValidateConflictingDataMigrationConfigurations() { + Map actions = new HashMap<>(); + actions.put(TEST_MIGRATE_ACTION.getWriteableName(), new MigrateAction(true)); + actions.put(TEST_ALLOCATE_ACTION.getWriteableName(), TEST_ALLOCATE_ACTION); + List phases = org.elasticsearch.common.collect.List.of( + new Phase(WARM_PHASE, TimeValue.ZERO, actions), new Phase(COLD_PHASE, TimeValue.ZERO, actions) + ); + + Exception validationException = expectThrows(IllegalArgumentException.class, + () -> TimeseriesLifecycleType.INSTANCE.validate(phases)); + assertThat(validationException.getMessage(), equalTo("phases [warm,cold] specify an enabled migrate action and an allocate " + + "action with allocation rules. specify only a single data migration in each phase")); + + // disabling the migrate action makes the phases definition valid as only the allocate action will perform data migration + actions.put(TEST_MIGRATE_ACTION.getWriteableName(), new MigrateAction(false)); + try { + TimeseriesLifecycleType.INSTANCE.validate(phases); + } catch (Exception e) { + fail("not expecting a failure for phases that specify one action that migrates data" + e); + } + } + public void testGetOrderedPhases() { Map phaseMap = new HashMap<>(); for (String phaseName : randomSubsetOf(randomIntBetween(0, VALID_PHASES.size()), VALID_PHASES)) { @@ -196,6 +226,18 @@ public void testGetOrderedPhases() { assertTrue(isSorted(TimeseriesLifecycleType.INSTANCE.getOrderedPhases(phaseMap), Phase::getName, VALID_PHASES)); } + public void testGetOrderedPhasesInsertsMigrateAction() { + Map phaseMap = new HashMap<>(); + phaseMap.put(HOT_PHASE, new Phase(HOT_PHASE, TimeValue.ZERO, org.elasticsearch.common.collect.Map.of())); + phaseMap.put(WARM_PHASE, new Phase(WARM_PHASE, TimeValue.ZERO, org.elasticsearch.common.collect.Map.of())); + + List orderedPhases = TimeseriesLifecycleType.INSTANCE.getOrderedPhases(phaseMap); + assertTrue(isSorted(orderedPhases, Phase::getName, VALID_PHASES)); + Phase warmPhase = orderedPhases.get(1); + assertThat(warmPhase, is(notNullValue())); + assertThat(warmPhase.getActions().get(MigrateAction.NAME), is(notNullValue())); + } + public void testUnfollowInjections() { assertTrue(isUnfollowInjected("hot", RolloverAction.NAME)); assertTrue(isUnfollowInjected("warm", ShrinkAction.NAME)); @@ -590,6 +632,41 @@ public void testGetNextActionName() { exception.getMessage()); } + public void testShouldMigrateDataToTiers() { + { + // the allocate action contain allocation rules + Map actions = new HashMap<>(); + actions.put(TEST_MIGRATE_ACTION.getWriteableName(), new MigrateAction(false)); + actions.put(TEST_ALLOCATE_ACTION.getWriteableName(), TEST_ALLOCATE_ACTION); + Phase phase = new Phase(WARM_PHASE, TimeValue.ZERO, actions); + assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(false)); + } + + { + // the allocate action only specifies the number of replicas + Map actions = new HashMap<>(); + actions.put(TEST_ALLOCATE_ACTION.getWriteableName(), new AllocateAction(2, null, null, null)); + Phase phase = new Phase(WARM_PHASE, TimeValue.ZERO, actions); + assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(true)); + } + + { + // there's an enabled migrate action specified + Map actions = new HashMap<>(); + actions.put(TEST_MIGRATE_ACTION.getWriteableName(), new MigrateAction(true)); + Phase phase = new Phase(WARM_PHASE, TimeValue.ZERO, actions); + assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(false)); + } + + { + // there's a disabled migrate action specified + Map actions = new HashMap<>(); + actions.put(TEST_MIGRATE_ACTION.getWriteableName(), new MigrateAction(false)); + Phase phase = new Phase(WARM_PHASE, TimeValue.ZERO, actions); + assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(false)); + } + } + private void assertNextActionName(String phaseName, String currentAction, String expectedNextAction, String... availableActionNames) { Map availableActions = convertActionNamesToActions(availableActionNames); Phase phase = new Phase(phaseName, TimeValue.ZERO, availableActions); @@ -626,7 +703,9 @@ private ConcurrentMap convertActionNamesToActions(Strin case SetPriorityAction.NAME: return new SetPriorityAction(0); case UnfollowAction.NAME: - return new UnfollowAction(); + return new UnfollowAction(); + case MigrateAction.NAME: + return new MigrateAction(true); } return new DeleteAction(); }).collect(Collectors.toConcurrentMap(LifecycleAction::getWriteableName, Function.identity())); @@ -698,6 +777,8 @@ private LifecycleAction getTestAction(String actionName) { return TEST_UNFOLLOW_ACTION; case SearchableSnapshotAction.NAME: return TEST_SEARCHABLE_SNAPSHOT_ACTION; + case MigrateAction.NAME: + return TEST_MIGRATE_ACTION; default: throw new IllegalArgumentException("unsupported timeseries phase action [" + actionName + "]"); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/action/PutLifecycleRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/action/PutLifecycleRequestTests.java index 6b1e20c5551df..54a34217a8e4d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/action/PutLifecycleRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/action/PutLifecycleRequestTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecyclePolicyTests; import org.elasticsearch.xpack.core.ilm.LifecycleType; +import org.elasticsearch.xpack.core.ilm.MigrateAction; import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction; @@ -75,6 +76,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new), + new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new) )); } @@ -97,6 +99,7 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME), SearchableSnapshotAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) )); return new NamedXContentRegistry(entries); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStepInfoTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/step/info/AllocationRoutedStepInfoTests.java similarity index 56% rename from x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStepInfoTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/step/info/AllocationRoutedStepInfoTests.java index 763f69214911a..4b019bc616ff1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/AllocationRoutedStepInfoTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/step/info/AllocationRoutedStepInfoTests.java @@ -4,25 +4,24 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.core.ilm; +package org.elasticsearch.xpack.core.ilm.step.info; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; -import org.elasticsearch.xpack.core.ilm.AllocationRoutedStep.Info; import java.io.IOException; -public class AllocationRoutedStepInfoTests extends AbstractXContentTestCase { +public class AllocationRoutedStepInfoTests extends AbstractXContentTestCase { @Override - protected Info createTestInstance() { - return new Info(randomNonNegativeLong(), randomNonNegativeLong(), randomBoolean()); + protected AllocationInfo createTestInstance() { + return new AllocationInfo(randomNonNegativeLong(), randomNonNegativeLong(), randomBoolean(), randomAlphaOfLengthBetween(5, 10)); } @Override - protected Info doParseInstance(XContentParser parser) throws IOException { - return Info.PARSER.apply(parser, null); + protected AllocationInfo doParseInstance(XContentParser parser) throws IOException { + return AllocationInfo.PARSER.apply(parser, null); } @Override @@ -36,14 +35,16 @@ public final void testEqualsAndHashcode() { } } - protected final Info copyInstance(Info instance) throws IOException { - return new Info(instance.getActualReplicas(), instance.getNumberShardsLeftToAllocate(), instance.allShardsActive()); + protected final AllocationInfo copyInstance(AllocationInfo instance) { + return new AllocationInfo(instance.getNumberOfReplicas(), instance.getNumberShardsLeftToAllocate(), instance.allShardsActive(), + instance.getMessage()); } - protected Info mutateInstance(Info instance) throws IOException { - long actualReplicas = instance.getActualReplicas(); + protected AllocationInfo mutateInstance(AllocationInfo instance) throws IOException { + long actualReplicas = instance.getNumberOfReplicas(); long shardsToAllocate = instance.getNumberShardsLeftToAllocate(); boolean allShardsActive = instance.allShardsActive(); + String message = instance.getMessage(); switch (between(0, 2)) { case 0: shardsToAllocate += between(1, 20); @@ -54,10 +55,13 @@ protected Info mutateInstance(Info instance) throws IOException { case 2: actualReplicas += between(1, 20); break; + case 3: + message = randomValueOtherThan(message, () -> randomAlphaOfLengthBetween(5, 10)); + break; default: throw new AssertionError("Illegal randomisation branch"); } - return new Info(actualReplicas, shardsToAllocate, allShardsActive); + return new AllocationInfo(actualReplicas, shardsToAllocate, allShardsActive, message); } } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 00f29ea10b411..fef7ce257b03e 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleAction; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.MigrateAction; import org.elasticsearch.xpack.core.ilm.Phase; import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; @@ -675,8 +676,21 @@ public void testSetSingleNodeAllocationRetriesUntilItSucceeds() throws Exception request.addParameter("level", "shards"); }); - // assign the policy that'll attempt to shrink the index - createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(expectedFinalShards)); + // assign the policy that'll attempt to shrink the index (disabling the migrate action as it'll otherwise wait for + // all shards to be active and we want that to happen as part of the shrink action) + MigrateAction migrateAction = new MigrateAction(false); + ShrinkAction shrinkAction = new ShrinkAction(expectedFinalShards); + Phase phase = new Phase("warm", TimeValue.ZERO, org.elasticsearch.common.collect.Map.of( + migrateAction.getWriteableName(), migrateAction, shrinkAction.getWriteableName(), shrinkAction) + ); + LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, singletonMap(phase.getName(), phase)); + XContentBuilder builder = jsonBuilder(); + lifecyclePolicy.toXContent(builder, null); + final StringEntity entity = new StringEntity( + "{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON); + Request putPolicyRequest = new Request("PUT", "_ilm/policy/" + policy); + putPolicyRequest.setEntity(entity); + client().performRequest(putPolicyRequest); updatePolicy(index, policy); assertTrue("ILM did not start retrying the set-single-node-allocation step", waitUntil(() -> { diff --git a/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/DataTiersMigrationsTests.java b/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/DataTiersMigrationsTests.java new file mode 100644 index 0000000000000..55429decaa3e4 --- /dev/null +++ b/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/DataTiersMigrationsTests.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm; + +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.core.DataTier; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ilm.DataTierMigrationRoutedStep; +import org.elasticsearch.xpack.core.ilm.ExplainLifecycleRequest; +import org.elasticsearch.xpack.core.ilm.ExplainLifecycleResponse; +import org.elasticsearch.xpack.core.ilm.IndexLifecycleExplainResponse; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.action.ExplainLifecycleAction; +import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Locale; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.test.NodeRoles.onlyRole; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class DataTiersMigrationsTests extends ESIntegTestCase { + + private String policy; + private String managedIndex; + + @Before + public void refreshDataStreamAndPolicy() { + policy = "policy-" + randomAlphaOfLength(5); + managedIndex = "index-" + randomAlphaOfLengthBetween(10, 15).toLowerCase(Locale.ROOT); + } + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(LocalStateCompositeXPackPlugin.class, IndexLifecycle.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal)); + settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + settings.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s"); + settings.put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false); + settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false); + return settings.build(); + } + @Override + protected Settings transportClientSettings() { + Settings.Builder settings = Settings.builder().put(super.transportClientSettings()); + settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + return settings.build(); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + public static Settings hotNode(final Settings settings) { + return onlyRole(settings, DataTier.DATA_HOT_NODE_ROLE); + } + + public static Settings warmNode(final Settings settings) { + return onlyRole(settings, DataTier.DATA_WARM_NODE_ROLE); + } + + public static Settings coldNode(final Settings settings) { + return onlyRole(settings, DataTier.DATA_COLD_NODE_ROLE); + } + + public void testIndexDataTierMigration() throws Exception { + internalCluster().startMasterOnlyNodes(1, Settings.EMPTY); + logger.info("starting hot data node"); + internalCluster().startNode(hotNode(Settings.EMPTY)); + + Phase hotPhase = new Phase("hot", TimeValue.ZERO, Collections.emptyMap()); + Phase warmPhase = new Phase("warm", TimeValue.ZERO, Collections.emptyMap()); + Phase coldPhase = new Phase("cold", TimeValue.ZERO, Collections.emptyMap()); + LifecyclePolicy lifecyclePolicy = new LifecyclePolicy( + policy, org.elasticsearch.common.collect.Map.of("hot", hotPhase, "warm", warmPhase, "cold", coldPhase) + ); + PutLifecycleAction.Request putLifecycleRequest = new PutLifecycleAction.Request(lifecyclePolicy); + PutLifecycleAction.Response putLifecycleResponse = client().execute(PutLifecycleAction.INSTANCE, putLifecycleRequest).get(); + assertAcked(putLifecycleResponse); + + Settings settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0).put(LifecycleSettings.LIFECYCLE_NAME, policy).build(); + CreateIndexResponse res = client().admin().indices().prepareCreate(managedIndex).setSettings(settings).get(); + assertTrue(res.isAcknowledged()); + + assertBusy(() -> { + ExplainLifecycleRequest explainRequest = new ExplainLifecycleRequest().indices(managedIndex); + ExplainLifecycleResponse explainResponse = client().execute(ExplainLifecycleAction.INSTANCE, + explainRequest).get(); + + IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex); + assertThat(indexLifecycleExplainResponse.getPhase(), is("warm")); + assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME)); + }); + + logger.info("starting warm data node"); + internalCluster().startNode(warmNode(Settings.EMPTY)); + assertBusy(() -> { + ExplainLifecycleRequest explainRequest = new ExplainLifecycleRequest().indices(managedIndex); + ExplainLifecycleResponse explainResponse = client().execute(ExplainLifecycleAction.INSTANCE, + explainRequest).get(); + + IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex); + assertThat(indexLifecycleExplainResponse.getPhase(), is("cold")); + assertThat(indexLifecycleExplainResponse.getStep(), is(DataTierMigrationRoutedStep.NAME)); + }); + + logger.info("starting cold data node"); + internalCluster().startNode(coldNode(Settings.EMPTY)); + + // wait for lifecycle to complete in the cold phase after the index has been migrated to the cold node + assertBusy(() -> { + ExplainLifecycleRequest explainRequest = new ExplainLifecycleRequest().indices(managedIndex); + ExplainLifecycleResponse explainResponse = client().execute(ExplainLifecycleAction.INSTANCE, + explainRequest).get(); + + IndexLifecycleExplainResponse indexLifecycleExplainResponse = explainResponse.getIndexResponses().get(managedIndex); + assertThat(indexLifecycleExplainResponse.getPhase(), is("cold")); + assertThat(indexLifecycleExplainResponse.getStep(), is("complete")); + }); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index d5b85e1e8027e..37a5bb8b4f264 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -46,6 +46,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleAction; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.LifecycleType; +import org.elasticsearch.xpack.core.ilm.MigrateAction; import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction; @@ -249,7 +250,9 @@ public List getNa new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(WaitForSnapshotAction.NAME), WaitForSnapshotAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME), - SearchableSnapshotAction::parse) + SearchableSnapshotAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), + MigrateAction::parse) ); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleMetadataTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleMetadataTests.java index 6c942b4d4ba95..2a03ab8e9daae 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleMetadataTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleMetadataTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; import org.elasticsearch.xpack.core.ilm.LifecycleType; +import org.elasticsearch.xpack.core.ilm.MigrateAction; import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.ilm.Phase; import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; @@ -91,6 +92,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new), + new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new) )); } @@ -112,6 +114,7 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME), SearchableSnapshotAction::parse) )); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index ca2c2fc175642..d8777c021a683 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; import org.elasticsearch.xpack.core.ilm.LifecyclePolicyTests; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.MigrateAction; import org.elasticsearch.xpack.core.ilm.MockAction; import org.elasticsearch.xpack.core.ilm.MockStep; import org.elasticsearch.xpack.core.ilm.OperationMode; @@ -171,7 +172,7 @@ public void testRunPolicyErrorStep() { Phase phase = policy.getPhases().get(phaseName); PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong()); String phaseJson = Strings.toString(phaseExecutionInfo); - LifecycleAction action = randomFrom(phase.getActions().values()); + LifecycleAction action = randomValueOtherThan(new MigrateAction(false), () -> randomFrom(phase.getActions().values())); Step step = randomFrom(action.toSteps(new NoOpClient(threadPool), phaseName, null)); StepKey stepKey = step.getKey(); @@ -729,7 +730,7 @@ public void testGetCurrentStep() { Phase phase = policy.getPhases().get(phaseName); PhaseExecutionInfo pei = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong()); String phaseJson = Strings.toString(pei); - LifecycleAction action = randomFrom(phase.getActions().values()); + LifecycleAction action = randomValueOtherThan(new MigrateAction(false), () -> randomFrom(phase.getActions().values())); Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY)); Settings indexSettings = Settings.builder() .put("index.number_of_shards", 1) diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java index 29b23e2d3ed18..f2e8928828282 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; import org.elasticsearch.xpack.core.ilm.LifecyclePolicyTests; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.MigrateAction; import org.elasticsearch.xpack.core.ilm.MockStep; import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.ilm.Phase; @@ -91,7 +92,7 @@ public void testGetStep() { Phase phase = policy.getPhases().get(phaseName); PhaseExecutionInfo pei = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong()); String phaseJson = Strings.toString(pei); - LifecycleAction action = randomFrom(phase.getActions().values()); + LifecycleAction action = randomValueOtherThan(new MigrateAction(false), () -> randomFrom(phase.getActions().values())); Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY)); LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(); lifecycleState.setPhaseDefinition(phaseJson); @@ -159,7 +160,7 @@ public void testGetStepUnknownStepKey() { Phase phase = policy.getPhases().get(phaseName); PhaseExecutionInfo pei = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong()); String phaseJson = Strings.toString(pei); - LifecycleAction action = randomFrom(phase.getActions().values()); + LifecycleAction action = randomValueOtherThan(new MigrateAction(false), () -> randomFrom(phase.getActions().values())); Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY)); LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(); lifecycleState.setPhaseDefinition(phaseJson);