Skip to content

Commit

Permalink
ILM: searchable snapshot executes before migrate in cold/frozen (#68861)
Browse files Browse the repository at this point in the history
This moves the execution of the `searchable_snapshot` action before the
`migrate` action in the `cold` and `frozen` phases for more efficient
data migration (ie. mounting it as a searchable snapshot directly on the
target tier)

Now that searchable_snapshot can precede other actions in the same phase
(eg. in frozen it is followed by `migrate`) we need to allow the mounted
index to resume executing the ILM policy starting with a step that's part
of a new action (ie. migrate).

This adds support to resume the execution of the mounted index from another
action.

With older versions, the execution would resume from the PhaseCompleteStep
as it was the last action in a phase, which was handled as a special case
in the `CopyExecutionStateStep`. This  generalises the `CopyExecutionStateStep`
to be able to resume from any `StepKey`.
  • Loading branch information
andreidan authored Feb 15, 2021
1 parent 81afc66 commit 800ae51
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 50 deletions.
5 changes: 5 additions & 0 deletions docs/reference/ilm/actions/ilm-migrate.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ replicas, {ilm-init} reduces the number of replicas before migrating the index.
To prevent automatic migration without specifying allocation options,
you can explicitly include the migrate action and set the enabled option to `false`.

If the `cold` phase defines a <<ilm-searchable-snapshot, searchable snapshot action>> the `migrate`
action will not be injected automatically in the `cold` phase because the managed index will be
mounted directly on the target tier using the same <<tier-preference-allocation-filter, _tier_preference>>
infrastructure the `migrate` actions configures.

In the warm phase, the `migrate` action sets <<tier-preference-allocation-filter, `index.routing.allocation.include._tier_preference`>>
to `data_warm,data_hot`. This moves the index to nodes in the
<<warm-tier, warm tier>>. If there are no nodes in the warm tier, it falls back to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
/**
* Copies the execution state data from one index to another, typically after a
* new index has been created. As part of the execution state copy it will set the target index
* "current step" to the provided step name (part of the same phase and action as the current step's, unless
* the "complete" step is configured in which case the action will be changed to "complete" as well)
* "current step" to the provided target next step {@link org.elasticsearch.xpack.core.ilm.Step.StepKey}.
*
* Useful for actions such as shrink.
*/
Expand All @@ -32,20 +31,20 @@ public class CopyExecutionStateStep extends ClusterStateActionStep {
private static final Logger logger = LogManager.getLogger(CopyExecutionStateStep.class);

private final String targetIndexPrefix;
private final String targetNextStepName;
private final StepKey targetNextStepKey;

public CopyExecutionStateStep(StepKey key, StepKey nextStepKey, String targetIndexPrefix, String targetNextStepName) {
public CopyExecutionStateStep(StepKey key, StepKey nextStepKey, String targetIndexPrefix, StepKey targetNextStepKey) {
super(key, nextStepKey);
this.targetIndexPrefix = targetIndexPrefix;
this.targetNextStepName = targetNextStepName;
this.targetNextStepKey = targetNextStepKey;
}

String getTargetIndexPrefix() {
return targetIndexPrefix;
}

String getTargetNextStepName() {
return targetNextStepName;
StepKey getTargetNextStepKey() {
return targetNextStepKey;
}

@Override
Expand All @@ -69,20 +68,17 @@ public ClusterState performAction(Index index, ClusterState clusterState) {
"] to [" + targetIndexName + "] as target index does not exist");
}

String phase = targetNextStepKey.getPhase();
String action = targetNextStepKey.getAction();
String step = targetNextStepKey.getName();
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetadata);
String phase = lifecycleState.getPhase();
String action = lifecycleState.getAction();
long lifecycleDate = lifecycleState.getLifecycleDate();

LifecycleExecutionState.Builder relevantTargetCustomData = LifecycleExecutionState.builder();
relevantTargetCustomData.setIndexCreationDate(lifecycleDate);
relevantTargetCustomData.setAction(action);
relevantTargetCustomData.setPhase(phase);
relevantTargetCustomData.setStep(targetNextStepName);
if (targetNextStepName.equals(PhaseCompleteStep.NAME)) {
relevantTargetCustomData.setAction(PhaseCompleteStep.NAME);
} else {
relevantTargetCustomData.setAction(action);
}
relevantTargetCustomData.setStep(step);
relevantTargetCustomData.setSnapshotRepository(lifecycleState.getSnapshotRepository());
relevantTargetCustomData.setSnapshotName(lifecycleState.getSnapshotName());
relevantTargetCustomData.setSnapshotIndexName(lifecycleState.getSnapshotIndexName());
Expand All @@ -107,11 +103,11 @@ public boolean equals(Object o) {
}
CopyExecutionStateStep that = (CopyExecutionStateStep) o;
return Objects.equals(targetIndexPrefix, that.targetIndexPrefix) &&
Objects.equals(targetNextStepName, that.targetNextStepName);
Objects.equals(targetNextStepKey, that.targetNextStepKey);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), targetIndexPrefix, targetNextStepName);
return Objects.hash(super.hashCode(), targetIndexPrefix, targetNextStepKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -25,6 +28,8 @@
import java.util.Objects;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.FROZEN_PHASE;

/**
* A {@link LifecycleAction} which enables or disables the automatic migration of data between
* {@link org.elasticsearch.xpack.core.DataTier}s.
Expand All @@ -33,6 +38,8 @@ public class MigrateAction implements LifecycleAction {
public static final String NAME = "migrate";
public static final ParseField ENABLED_FIELD = new ParseField("enabled");

private static final Logger logger = LogManager.getLogger(MigrateAction.class);
static final String CONDITIONAL_SKIP_MIGRATE_STEP = BranchingStep.NAME + "-check-skip-action";
// Represents an ordered list of data tiers from frozen to hot (or slow to fast)
private static final List<String> FROZEN_TO_HOT_TIERS =
List.of(DataTier.DATA_FROZEN, DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);
Expand Down Expand Up @@ -92,22 +99,44 @@ public boolean isSafeAction() {
@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
if (enabled) {
StepKey preMigrateBranchingKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_MIGRATE_STEP);
StepKey migrationKey = new StepKey(phase, NAME, NAME);
StepKey migrationRoutedKey = new StepKey(phase, NAME, DataTierMigrationRoutedStep.NAME);

Settings.Builder migrationSettings = Settings.builder();
String dataTierName = "data_" + phase;
assert DataTier.validTierName(dataTierName) : "invalid data tier name:" + dataTierName;
migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, getPreferredTiersConfiguration(dataTierName));
String targetTier = "data_" + phase;
assert DataTier.validTierName(targetTier) : "invalid data tier name:" + targetTier;

BranchingStep conditionalSkipActionStep = new BranchingStep(preMigrateBranchingKey, migrationKey, nextStepKey,
(index, clusterState) -> {
if (skipMigrateAction(phase, clusterState.metadata().index(index))) {
String policyName =
LifecycleSettings.LIFECYCLE_NAME_SETTING.get(clusterState.metadata().index(index).getSettings());
logger.debug("[{}] action is configured for index [{}] in policy [{}] which is already mounted as a searchable " +
"snapshot. skipping this action", MigrateAction.NAME, index.getName(), policyName);
return true;
}

// don't skip the migrate action as the index is not mounted as searchable snapshot or we're in the frozen phase
return false;
});
migrationSettings.put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, getPreferredTiersConfiguration(targetTier));
UpdateSettingsStep updateMigrationSettingStep = new UpdateSettingsStep(migrationKey, migrationRoutedKey, client,
migrationSettings.build());
DataTierMigrationRoutedStep migrationRoutedStep = new DataTierMigrationRoutedStep(migrationRoutedKey, nextStepKey);
return Arrays.asList(updateMigrationSettingStep, migrationRoutedStep);
return Arrays.asList(conditionalSkipActionStep, updateMigrationSettingStep, migrationRoutedStep);
} else {
return List.of();
}
}

static boolean skipMigrateAction(String phase, IndexMetadata indexMetadata) {
// if the index is a searchable snapshot we skip the migrate action (as mounting an index as searchable snapshot
// configures the tier allocation preference), unless we're in the frozen phase
return (indexMetadata.getSettings().get(LifecycleSettings.SNAPSHOT_INDEX_NAME) != null)
&& (phase.equals(FROZEN_PHASE) == false);
}

/**
* Based on the provided target tier it will return a comma separated list of preferred tiers.
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,8 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
client, getRestoredIndexPrefix(mountSnapshotKey), getConcreteStorageType(mountSnapshotKey));
WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep(waitForGreenRestoredIndexKey,
copyMetadataKey, ClusterHealthStatus.GREEN, getRestoredIndexPrefix(waitForGreenRestoredIndexKey));
// a policy with only the cold phase will have a null "nextStepKey", hence the "null" nextStepKey passed in below when that's the
// case
CopyExecutionStateStep copyMetadataStep = new CopyExecutionStateStep(copyMetadataKey, copyLifecyclePolicySettingKey,
getRestoredIndexPrefix(copyMetadataKey), nextStepKey != null ? nextStepKey.getName() : "null");
getRestoredIndexPrefix(copyMetadataKey), nextStepKey);
CopySettingsStep copySettingsStep = new CopySettingsStep(copyLifecyclePolicySettingKey, dataStreamCheckBranchingKey,
getRestoredIndexPrefix(copyLifecyclePolicySettingKey), LifecycleSettings.LIFECYCLE_NAME);
BranchingStep isDataStreamBranchingStep = new BranchingStep(dataStreamCheckBranchingKey, swapAliasesKey, replaceDataStreamIndexKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
SHRUNKEN_INDEX_PREFIX);
ShrunkShardsAllocatedStep allocated = new ShrunkShardsAllocatedStep(enoughShardsKey, copyMetadataKey, SHRUNKEN_INDEX_PREFIX);
CopyExecutionStateStep copyMetadata = new CopyExecutionStateStep(copyMetadataKey, dataStreamCheckBranchingKey,
SHRUNKEN_INDEX_PREFIX, ShrunkenIndexCheckStep.NAME);
SHRUNKEN_INDEX_PREFIX, isShrunkIndexKey);
// by the time we get to this step we have 2 indices, the source and the shrunken one. we now need to choose an index
// swapping strategy such that the shrunken index takes the place of the source index (which is also deleted).
// if the source index is part of a data stream it's a matter of replacing it with the shrunken index one in the data stream and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public class TimeseriesLifecycleType implements LifecycleType {
static final List<String> ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME,
AllocateAction.NAME, MigrateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
static final List<String> ORDERED_VALID_COLD_ACTIONS;
static final List<String> ORDERED_VALID_FROZEN_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME,
AllocateAction.NAME, MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME);
static final List<String> ORDERED_VALID_FROZEN_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME,
ReadOnlyAction.NAME, SearchableSnapshotAction.NAME, AllocateAction.NAME, MigrateAction.NAME, FreezeAction.NAME);
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(WaitForSnapshotAction.NAME, DeleteAction.NAME);
static final Set<String> VALID_HOT_ACTIONS;
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
Expand All @@ -67,13 +67,13 @@ public class TimeseriesLifecycleType implements LifecycleType {
if (RollupV2.isEnabled()) {
ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME,
ReadOnlyAction.NAME, RollupILMAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, SearchableSnapshotAction.NAME);
ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
MigrateAction.NAME, FreezeAction.NAME, RollupILMAction.NAME, SearchableSnapshotAction.NAME);
ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, SearchableSnapshotAction.NAME,
AllocateAction.NAME, MigrateAction.NAME, FreezeAction.NAME, RollupILMAction.NAME);
} else {
ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME,
ReadOnlyAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, SearchableSnapshotAction.NAME);
ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME);
ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, SearchableSnapshotAction.NAME,
AllocateAction.NAME, MigrateAction.NAME, FreezeAction.NAME);
}
VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
Expand Down Expand Up @@ -133,6 +133,13 @@ static boolean shouldInjectMigrateStepForPhase(Phase phase) {
}
}

if (phase.getActions().get(SearchableSnapshotAction.NAME) != null && phase.getName().equals(FROZEN_PHASE) == false) {
// the `searchable_snapshot` action defines migration rules itself, so no need to inject a migrate action, unless we're in the
// frozen phase (as the migrate action would also include the `data_frozen` role which is not guaranteed to be included by all
// types of searchable snapshots)
return false;
}

MigrateAction migrateAction = (MigrateAction) phase.getActions().get(MigrateAction.NAME);
// if the user configured the {@link MigrateAction} already we won't automatically configure it
return migrateAction == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ protected CopyExecutionStateStep createRandomInstance() {
StepKey stepKey = randomStepKey();
StepKey nextStepKey = randomStepKey();
String shrunkIndexPrefix = randomAlphaOfLength(10);
String nextStepName = randomStepKey().getName();
return new CopyExecutionStateStep(stepKey, nextStepKey, shrunkIndexPrefix, nextStepName);
StepKey targetNextStepKey = randomStepKey();
return new CopyExecutionStateStep(stepKey, nextStepKey, shrunkIndexPrefix, targetNextStepKey);
}

@Override
protected CopyExecutionStateStep mutateInstance(CopyExecutionStateStep instance) {
StepKey key = instance.getKey();
StepKey nextKey = instance.getNextStepKey();
String shrunkIndexPrefix = instance.getTargetIndexPrefix();
String nextStepName = instance.getTargetNextStepName();
StepKey targetNextStepKey = instance.getTargetNextStepKey();

switch (between(0, 2)) {
case 0:
Expand All @@ -48,19 +48,20 @@ protected CopyExecutionStateStep mutateInstance(CopyExecutionStateStep instance)
shrunkIndexPrefix += randomAlphaOfLength(5);
break;
case 3:
nextStepName = randomAlphaOfLengthBetween(1, 10);
targetNextStepKey = new StepKey(targetNextStepKey.getPhase(), targetNextStepKey.getAction(),
targetNextStepKey.getName() + randomAlphaOfLength(5));
break;
default:
throw new AssertionError("Illegal randomisation branch");
}

return new CopyExecutionStateStep(key, nextKey, shrunkIndexPrefix, nextStepName);
return new CopyExecutionStateStep(key, nextKey, shrunkIndexPrefix, targetNextStepKey);
}

@Override
protected CopyExecutionStateStep copyInstance(CopyExecutionStateStep instance) {
return new CopyExecutionStateStep(instance.getKey(), instance.getNextStepKey(), instance.getTargetIndexPrefix(),
instance.getTargetNextStepName());
instance.getTargetNextStepKey());
}

public void testPerformAction() {
Expand Down Expand Up @@ -89,10 +90,11 @@ public void testPerformAction() {
LifecycleExecutionState newIndexData = LifecycleExecutionState
.fromIndexMetadata(newClusterState.metadata().index(step.getTargetIndexPrefix() + indexName));

StepKey targetNextStepKey = step.getTargetNextStepKey();
assertEquals(newIndexData.getLifecycleDate(), oldIndexData.getLifecycleDate());
assertEquals(newIndexData.getPhase(), oldIndexData.getPhase());
assertEquals(newIndexData.getAction(), oldIndexData.getAction());
assertEquals(newIndexData.getStep(), step.getTargetNextStepName());
assertEquals(newIndexData.getPhase(), targetNextStepKey.getPhase());
assertEquals(newIndexData.getAction(), targetNextStepKey.getAction());
assertEquals(newIndexData.getStep(), targetNextStepKey.getName());
assertEquals(newIndexData.getSnapshotRepository(), oldIndexData.getSnapshotRepository());
assertEquals(newIndexData.getSnapshotName(), oldIndexData.getSnapshotName());
}
Expand Down
Loading

0 comments on commit 800ae51

Please sign in to comment.