Skip to content

Commit

Permalink
Generate a correct consistent downsample target name in downsample il…
Browse files Browse the repository at this point in the history
…m action.

Bring back generate downsample step that is used to prepare the lifecycle state and sets the name of the target downsample index to be used. A number of steps will use this to execute various downsample related operations.

The generate downsample index name step produces a consistent name. And also takes into account whether the source index was already downsampled, so a logical name is generated.
  • Loading branch information
martijnvg committed Aug 15, 2023
1 parent 4a209b4 commit 66f2383
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.xpack.core.ilm.DownsampleStep.generateDownsampleIndexName;

/**
* A {@link LifecycleAction} which calls {@link org.elasticsearch.xpack.core.downsample.DownsampleAction} on an index
Expand All @@ -44,7 +43,6 @@ public class DownsampleAction implements LifecycleAction {
public static final String DOWNSAMPLED_INDEX_PREFIX = "downsample-";
public static final String CONDITIONAL_TIME_SERIES_CHECK_KEY = BranchingStep.NAME + "-on-timeseries-check";
public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";
public static final String GENERATE_DOWNSAMPLE_STEP_NAME = "generate-downsampled-index-name";
public static final TimeValue DEFAULT_WAIT_TIMEOUT = new TimeValue(1, TimeUnit.DAYS);
private static final ParseField FIXED_INTERVAL_FIELD = new ParseField(DownsampleConfig.FIXED_INTERVAL);
private static final ParseField WAIT_TIMEOUT_FIELD = new ParseField("wait_timeout");
Expand Down Expand Up @@ -137,7 +135,7 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME);
StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyStep.NAME);
StepKey cleanupDownsampleIndexKey = new StepKey(phase, NAME, CleanupTargetIndexStep.NAME);
StepKey generateDownsampleIndexNameKey = new StepKey(phase, NAME, GENERATE_DOWNSAMPLE_STEP_NAME);
StepKey generateDownsampleIndexNameKey = new StepKey(phase, NAME, DownsamplePrepareLifeCycleStateStep.NAME);
StepKey downsampleKey = new StepKey(phase, NAME, DownsampleStep.NAME);
StepKey waitForDownsampleIndexKey = new StepKey(phase, NAME, WaitForIndexColorStep.NAME);
StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME);
Expand Down Expand Up @@ -166,7 +164,7 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, readOnlyKey, client);

// Mark source index as read-only
ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, downsampleKey, client);
ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, generateDownsampleIndexNameKey, client);

// Before the downsample action was retry-able, we used to generate a unique downsample index name and delete the previous index in
// case a failure occurred. The downsample action can now retry execution in case of failure and start where it left off, so no
Expand All @@ -175,9 +173,12 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
// upgrade was performed resume the ILM execution and complete the downsample action after upgrade.)
NoopStep cleanupDownsampleIndexStep = new NoopStep(cleanupDownsampleIndexKey, downsampleKey);

// Before a random downsample index name was generated.
// (this noop step allows ILM to resume after a rollover without failing)
NoopStep generateDownsampleIndexNameStep = new NoopStep(generateDownsampleIndexNameKey, downsampleKey);
// Prepare the lifecycleState by generating the name of the target index, that subsequest steps will use.
DownsamplePrepareLifeCycleStateStep generateDownsampleIndexNameStep = new DownsamplePrepareLifeCycleStateStep(
generateDownsampleIndexNameKey,
downsampleKey,
fixedInterval
);

// Here is where the actual downsample action takes place
DownsampleStep downsampleStep = new DownsampleStep(downsampleKey, waitForDownsampleIndexKey, client, fixedInterval, waitTimeout);
Expand All @@ -191,22 +192,22 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
waitForDownsampleIndexKey,
copyMetadataKey,
ClusterHealthStatus.YELLOW,
(indexName, lifecycleState) -> generateDownsampleIndexName(indexName, fixedInterval)
(indexName, lifecycleState) -> lifecycleState.downsampleIndexName()
),
cleanupDownsampleIndexKey
);

CopyExecutionStateStep copyExecutionStateStep = new CopyExecutionStateStep(
copyMetadataKey,
copyIndexLifecycleKey,
(indexName, lifecycleState) -> generateDownsampleIndexName(indexName, fixedInterval),
(indexName, lifecycleState) -> lifecycleState.downsampleIndexName(),
nextStepKey
);

CopySettingsStep copyLifecycleSettingsStep = new CopySettingsStep(
copyIndexLifecycleKey,
dataStreamCheckBranchingKey,
(indexName, lifecycleState) -> generateDownsampleIndexName(indexName, fixedInterval),
(indexName, lifecycleState) -> lifecycleState.downsampleIndexName(),
LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey()
);

Expand All @@ -228,15 +229,15 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep(
replaceDataStreamIndexKey,
deleteIndexKey,
(sourceIndexName, lifecycleState) -> generateDownsampleIndexName(sourceIndexName, fixedInterval)
(sourceIndexName, lifecycleState) -> lifecycleState.downsampleIndexName()
);
DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, nextStepKey, client);

SwapAliasesAndDeleteSourceIndexStep swapAliasesAndDeleteSourceIndexStep = new SwapAliasesAndDeleteSourceIndexStep(
swapAliasesKey,
nextStepKey,
client,
(indexName, lifecycleState) -> generateDownsampleIndexName(indexName, fixedInterval),
(indexName, lifecycleState) -> lifecycleState.downsampleIndexName(),
false
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ilm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;

import static org.elasticsearch.xpack.core.ilm.DownsampleAction.DOWNSAMPLED_INDEX_PREFIX;

public class DownsamplePrepareLifeCycleStateStep extends ClusterStateActionStep {

private static final Logger LOGGER = LogManager.getLogger(DownsamplePrepareLifeCycleStateStep.class);
public static final String NAME = "generate-downsampled-index-name";
private final DateHistogramInterval fixedInterval;

public DownsamplePrepareLifeCycleStateStep(StepKey key, StepKey nextStepKey, DateHistogramInterval fixedInterval) {
super(key, nextStepKey);
this.fixedInterval = fixedInterval;
}

@Override
public ClusterState performAction(Index index, ClusterState clusterState) {
IndexMetadata indexMetadata = clusterState.metadata().index(index);
if (indexMetadata == null) {
// Index must have been since deleted, ignore it
LOGGER.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().action(), index.getName());
return clusterState;
}

LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState();

LifecycleExecutionState.Builder newLifecycleState = LifecycleExecutionState.builder(lifecycleState);
final String downsampleIndexName = generateDownsampleIndexName(indexMetadata, fixedInterval);
newLifecycleState.setDownsampleIndexName(downsampleIndexName);

return LifecycleExecutionStateUtils.newClusterStateWithLifecycleState(
clusterState,
indexMetadata.getIndex(),
newLifecycleState.build()
);
}

@Override
public boolean isRetryable() {
return true;
}

static String generateDownsampleIndexName(IndexMetadata sourceIndexMetadata, DateHistogramInterval fixedInterval) {
String downsampleSourceName = sourceIndexMetadata.getSettings().get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME_KEY);
String sourceIndexName;
if (downsampleSourceName != null) {
sourceIndexName = downsampleSourceName;
} else {
sourceIndexName = sourceIndexMetadata.getIndex().getName();
}
return DOWNSAMPLED_INDEX_PREFIX + sourceIndexName + "-" + fixedInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import java.util.Objects;

import static org.elasticsearch.xpack.core.ilm.DownsampleAction.DOWNSAMPLED_INDEX_PREFIX;

/**
* ILM step that invokes the downsample action for an index using a {@link DateHistogramInterval}. The downsample
* index name is retrieved from the lifecycle state {@link LifecycleExecutionState#downsampleIndexName()}
Expand Down Expand Up @@ -67,8 +65,7 @@ public void performAction(

final String policyName = indexMetadata.getLifecyclePolicyName();
final String indexName = indexMetadata.getIndex().getName();
final String downsampleIndexName = generateDownsampleIndexName(indexName, fixedInterval);

final String downsampleIndexName = lifecycleState.downsampleIndexName();
IndexMetadata downsampleIndexMetadata = currentState.metadata().index(downsampleIndexName);
if (downsampleIndexMetadata != null) {
IndexMetadata.DownsampleTaskStatus downsampleIndexStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(
Expand Down Expand Up @@ -127,7 +124,4 @@ public boolean equals(Object obj) {
return super.equals(obj) && Objects.equals(fixedInterval, other.fixedInterval) && Objects.equals(waitTimeout, other.waitTimeout);
}

static String generateDownsampleIndexName(String sourceIndexName, DateHistogramInterval fixedInterval) {
return DOWNSAMPLED_INDEX_PREFIX + sourceIndexName + "-" + fixedInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.elasticsearch.xpack.core.ilm.DownsampleAction.CONDITIONAL_DATASTREAM_CHECK_KEY;
import static org.elasticsearch.xpack.core.ilm.DownsampleAction.CONDITIONAL_TIME_SERIES_CHECK_KEY;
import static org.elasticsearch.xpack.core.ilm.DownsampleAction.GENERATE_DOWNSAMPLE_STEP_NAME;
import static org.hamcrest.Matchers.equalTo;

public class DownsampleActionTests extends AbstractActionTestCase<DownsampleAction> {
Expand Down Expand Up @@ -84,14 +83,14 @@ public void testToSteps() {

assertTrue(steps.get(3) instanceof ReadOnlyStep);
assertThat(steps.get(3).getKey().name(), equalTo(ReadOnlyStep.NAME));
assertThat(steps.get(3).getNextStepKey().name(), equalTo(DownsampleStep.NAME));
assertThat(steps.get(3).getNextStepKey().name(), equalTo(DownsamplePrepareLifeCycleStateStep.NAME));

assertTrue(steps.get(4) instanceof NoopStep);
assertThat(steps.get(4).getKey().name(), equalTo(CleanupTargetIndexStep.NAME));
assertThat(steps.get(4).getNextStepKey().name(), equalTo(DownsampleStep.NAME));

assertTrue(steps.get(5) instanceof NoopStep);
assertThat(steps.get(5).getKey().name(), equalTo(GENERATE_DOWNSAMPLE_STEP_NAME));
assertTrue(steps.get(5) instanceof DownsamplePrepareLifeCycleStateStep);
assertThat(steps.get(5).getKey().name(), equalTo(DownsamplePrepareLifeCycleStateStep.NAME));
assertThat(steps.get(5).getNextStepKey().name(), equalTo(DownsampleStep.NAME));

assertTrue(steps.get(6) instanceof DownsampleStep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.rest.action.admin.indices.RestPutIndexTemplateAction;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.ObjectPath;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.core.ilm.CheckNotDataStreamWriteIndexStep;
Expand Down Expand Up @@ -314,6 +315,82 @@ public void testRollupNonTSIndex() throws Exception {
assertTrue("Source index should not have been deleted", indexExists(index));
}

public void testDownsampleTwice() throws Exception {
// Create the ILM policy
Request request = new Request("PUT", "_ilm/policy/" + policy);
request.setJsonEntity("""
{
"policy": {
"phases": {
"warm": {
"actions": {
"downsample": {
"fixed_interval" : "1m"
}
}
},
"cold": {
"actions": {
"downsample": {
"fixed_interval" : "1h"
}
}
}
}
}
}
""");
client().performRequest(request);

// Create a template
Request createIndexTemplateRequest = new Request("POST", "/_index_template/" + dataStream);
createIndexTemplateRequest.setJsonEntity(Strings.format(TEMPLATE, dataStream, policy));
assertOK(client().performRequest(createIndexTemplateRequest));

String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now());
index(client(), dataStream, true, null, "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5));

var getDataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStream));
String firstBackingIndex = ObjectPath.eval("data_streams.0.indices.0.index_name", responseAsMap(getDataStreamResponse));
logger.info("firstBackingIndex: {}", firstBackingIndex);
assertBusy(
() -> assertThat(
"index must wait in the " + CheckNotDataStreamWriteIndexStep.NAME + " until it is not the write index anymore",
explainIndex(client(), firstBackingIndex).get("step"),
is(CheckNotDataStreamWriteIndexStep.NAME)
),
30,
TimeUnit.SECONDS
);

// Manual rollover the original index such that it's not the write index in the data stream anymore
rolloverMaxOneDocCondition(client(), dataStream);

String downsampleIndexName = "downsample-" + firstBackingIndex + "-1m";
String downsampleOfDownsampleIndexName = "downsample-" + firstBackingIndex + "-1h";
try {
assertBusy(() -> {
assertThat(indexExists(downsampleOfDownsampleIndexName), is(true));
assertThat(indexExists(firstBackingIndex), is(false));
assertThat(indexExists(downsampleIndexName), is(false));

Map<String, Object> settings = getOnlyIndexSettings(client(), downsampleOfDownsampleIndexName);
assertEquals(firstBackingIndex, settings.get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME.getKey()));
assertEquals(DownsampleTaskStatus.SUCCESS.toString(), settings.get(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey()));
assertEquals(policy, settings.get(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey()));
}, 60, TimeUnit.SECONDS);
} catch (AssertionError ae) {
if (indexExists(firstBackingIndex)) {
logger.error("Index [{}] ilm explain {}", firstBackingIndex, explainIndex(client(), firstBackingIndex));
} else if (indexExists(downsampleIndexName)) {
logger.error("Index [{}] ilm explain {}", firstBackingIndex, explainIndex(client(), downsampleIndexName));
} else if (indexExists(downsampleOfDownsampleIndexName)) {
logger.error("Index [{}] ilm explain {}", firstBackingIndex, explainIndex(client(), downsampleOfDownsampleIndexName));
}
throw ae;
}
}

/**
* Gets the generated rollup index name for a given index by looking at newly created indices that match the rollup index name pattern
*
Expand Down

0 comments on commit 66f2383

Please sign in to comment.