diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudDataStreamLifecycleIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudDataStreamLifecycleIT.java index e33b1fdcfa57a..b772e0bb347e2 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudDataStreamLifecycleIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudDataStreamLifecycleIT.java @@ -164,7 +164,7 @@ public void testPutLifecycle() throws Exception { ).get(); assertThat(response.getDataStreamLifecycles().size(), equalTo(1)); assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream")); - assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getEffectiveDataRetention(), equalTo(dataRetention)); + assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getDataStreamRetention(), equalTo(dataRetention)); assertThat(response.getDataStreamLifecycles().get(0).lifecycle().isEnabled(), equalTo(true)); } @@ -189,7 +189,7 @@ public void testPutLifecycle() throws Exception { ).get(); assertThat(response.getDataStreamLifecycles().size(), equalTo(1)); assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream")); - assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getEffectiveDataRetention(), equalTo(dataRetention)); + assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getDataStreamRetention(), equalTo(dataRetention)); assertThat(response.getDataStreamLifecycles().get(0).lifecycle().isEnabled(), equalTo(false)); } } diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java index 471622489d9b2..a497eed121b0c 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java @@ -118,7 +118,7 @@ public void testExplainLifecycle() throws Exception { assertThat(explainIndex.isManagedByLifecycle(), is(true)); assertThat(explainIndex.getIndexCreationDate(), notNullValue()); assertThat(explainIndex.getLifecycle(), notNullValue()); - assertThat(explainIndex.getLifecycle().getEffectiveDataRetention(), nullValue()); + assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue()); if (internalCluster().numDataNodes() > 1) { // If the number of nodes is 1 then the cluster will be yellow so forcemerge will report an error if it has run assertThat(explainIndex.getError(), nullValue()); @@ -175,7 +175,7 @@ public void testExplainLifecycle() throws Exception { assertThat(explainIndex.isManagedByLifecycle(), is(true)); assertThat(explainIndex.getIndexCreationDate(), notNullValue()); assertThat(explainIndex.getLifecycle(), notNullValue()); - assertThat(explainIndex.getLifecycle().getEffectiveDataRetention(), nullValue()); + assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue()); if (explainIndex.getIndex().equals(DataStream.getDefaultBackingIndexName(dataStreamName, 1))) { // first generation index was rolled over @@ -243,7 +243,7 @@ public void testExplainLifecycleForIndicesWithErrors() throws Exception { assertThat(explainIndex.isManagedByLifecycle(), is(true)); assertThat(explainIndex.getIndexCreationDate(), notNullValue()); assertThat(explainIndex.getLifecycle(), notNullValue()); - assertThat(explainIndex.getLifecycle().getEffectiveDataRetention(), nullValue()); + assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue()); assertThat(explainIndex.getRolloverDate(), nullValue()); assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue()); // index has not been rolled over yet diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 8b15d6a4b7bdf..1b875c28f7f43 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -822,38 +822,40 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) { * @return The set of indices that delete requests have been sent for */ private Set maybeExecuteRetention(ClusterState state, DataStream dataStream, Set indicesToExcludeForRemainingRun) { - TimeValue retention = getRetentionConfiguration(dataStream); + Metadata metadata = state.metadata(); + List backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier); + if (backingIndicesOlderThanRetention.isEmpty()) { + return Set.of(); + } Set indicesToBeRemoved = new HashSet<>(); - if (retention != null) { - Metadata metadata = state.metadata(); - List backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier); - - for (Index index : backingIndicesOlderThanRetention) { - if (indicesToExcludeForRemainingRun.contains(index) == false) { - IndexMetadata backingIndex = metadata.index(index); - assert backingIndex != null : "the data stream backing indices must exist"; - - IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings()); - // we don't want to delete the source index if they have an in-progress downsampling operation because the - // target downsample index will remain in the system as a standalone index - if (downsampleStatus.equals(UNKNOWN)) { - indicesToBeRemoved.add(index); - - // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) - // let's start simple and reevaluate - String indexName = backingIndex.getIndex().getName(); - deleteIndexOnce(indexName, "the lapsed [" + retention + "] retention period"); - } else { - // there's an opportunity here to cancel downsampling and delete the source index now - logger.trace( - "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed " - + "because there's a downsampling operation currently in progress for this index. Current downsampling " - + "status is [{}]. When downsampling completes, DSL will delete this index.", - index.getName(), - retention, - downsampleStatus - ); - } + // We know that there is lifecycle and retention because there are indices to be deleted + assert dataStream.getLifecycle() != null; + TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(); + for (Index index : backingIndicesOlderThanRetention) { + if (indicesToExcludeForRemainingRun.contains(index) == false) { + IndexMetadata backingIndex = metadata.index(index); + assert backingIndex != null : "the data stream backing indices must exist"; + + IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings()); + // we don't want to delete the source index if they have an in-progress downsampling operation because the + // target downsample index will remain in the system as a standalone index + if (downsampleStatus.equals(UNKNOWN)) { + indicesToBeRemoved.add(index); + + // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) + // let's start simple and reevaluate + String indexName = backingIndex.getIndex().getName(); + deleteIndexOnce(indexName, "the lapsed [" + effectiveDataRetention + "] retention period"); + } else { + // there's an opportunity here to cancel downsampling and delete the source index now + logger.trace( + "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed " + + "because there's a downsampling operation currently in progress for this index. Current downsampling " + + "status is [{}]. When downsampling completes, DSL will delete this index.", + index.getName(), + effectiveDataRetention, + downsampleStatus + ); } } } @@ -1222,14 +1224,6 @@ private static boolean isForceMergeComplete(IndexMetadata backingIndex) { return customMetadata != null && customMetadata.containsKey(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY); } - @Nullable - static TimeValue getRetentionConfiguration(DataStream dataStream) { - if (dataStream.getLifecycle() == null) { - return null; - } - return dataStream.getLifecycle().getEffectiveDataRetention(); - } - /** * @return the duration of the last run in millis or null if the service hasn't completed a run yet. */ diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataIndexTemplateServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataIndexTemplateServiceTests.java index e7339cc3f334a..d1e07aacaddce 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataIndexTemplateServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataIndexTemplateServiceTests.java @@ -151,7 +151,7 @@ public void testLifecycleComposition() { DataStreamLifecycle result = composeDataLifecycles(lifecycles); // Defaults to true assertThat(result.isEnabled(), equalTo(true)); - assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle.getEffectiveDataRetention())); + assertThat(result.getDataStreamRetention(), equalTo(lifecycle.getDataStreamRetention())); assertThat(result.getDownsamplingRounds(), equalTo(lifecycle.getDownsamplingRounds())); } // If the last lifecycle is missing a property (apart from enabled) we keep the latest from the previous ones @@ -165,7 +165,7 @@ public void testLifecycleComposition() { List lifecycles = List.of(lifecycle, new DataStreamLifecycle()); DataStreamLifecycle result = composeDataLifecycles(lifecycles); assertThat(result.isEnabled(), equalTo(true)); - assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle.getEffectiveDataRetention())); + assertThat(result.getDataStreamRetention(), equalTo(lifecycle.getDataStreamRetention())); assertThat(result.getDownsamplingRounds(), equalTo(lifecycle.getDownsamplingRounds())); } // If both lifecycle have all properties, then the latest one overwrites all the others @@ -183,7 +183,7 @@ public void testLifecycleComposition() { List lifecycles = List.of(lifecycle1, lifecycle2); DataStreamLifecycle result = composeDataLifecycles(lifecycles); assertThat(result.isEnabled(), equalTo(lifecycle2.isEnabled())); - assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle2.getEffectiveDataRetention())); + assertThat(result.getDataStreamRetention(), equalTo(lifecycle2.getDataStreamRetention())); assertThat(result.getDownsamplingRounds(), equalTo(lifecycle2.getDownsamplingRounds())); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 14de79636be0d..1bcfdba1d16f4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -705,7 +705,7 @@ public DataStream snapshot(Collection indicesInSnapshot) { * is treated differently for the write index (i.e. they first need to be rolled over) */ public List getIndicesPastRetention(Function indexMetadataSupplier, LongSupplier nowSupplier) { - if (lifecycle == null || lifecycle.getEffectiveDataRetention() == null) { + if (lifecycle == null || lifecycle.isEnabled() == false || lifecycle.getEffectiveDataRetention() == null) { return List.of(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java index 215ed515748ab..b4a3a1eb3502a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java @@ -134,6 +134,16 @@ public boolean isEnabled() { */ @Nullable public TimeValue getEffectiveDataRetention() { + return getDataStreamRetention(); + } + + /** + * The least amount of time data the data stream is requesting es to keep the data. + * NOTE: this can be overriden by the {@link DataStreamLifecycle#getEffectiveDataRetention()}. + * @return the time period or null, null represents that data should never be deleted. + */ + @Nullable + public TimeValue getDataStreamRetention() { return dataRetention == null ? null : dataRetention.value; } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 1c4cb8c0681ff..9f7d6b49b0844 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -1249,7 +1249,7 @@ public void testGetIndicesPastRetentionWithOriginationDate() { creationAndRolloverTimes, settings(IndexVersion.current()), new DataStreamLifecycle() { - public TimeValue getEffectiveDataRetention() { + public TimeValue getDataStreamRetention() { return testRetentionReference.get(); } } diff --git a/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/DataStreamAndIndexLifecycleMixingTests.java b/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/DataStreamAndIndexLifecycleMixingTests.java index 637fbc8f8bf82..b9c58f728d1e3 100644 --- a/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/DataStreamAndIndexLifecycleMixingTests.java +++ b/x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/DataStreamAndIndexLifecycleMixingTests.java @@ -238,7 +238,7 @@ public void testIndexTemplateSwapsILMForDataStreamLifecycle() throws Exception { // let's migrate this data stream to use the custom data stream lifecycle client().execute( PutDataStreamLifecycleAction.INSTANCE, - new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getEffectiveDataRetention()) + new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getDataStreamRetention()) ).actionGet(); assertBusy(() -> { @@ -580,7 +580,7 @@ public void testUpdateIndexTemplateToDataStreamLifecyclePreference() throws Exce // let's migrate this data stream to use the custom data stream lifecycle client().execute( PutDataStreamLifecycleAction.INSTANCE, - new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getEffectiveDataRetention()) + new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getDataStreamRetention()) ).actionGet(); // data stream was rolled over and has 4 indices, 2 managed by ILM, and 2 managed by the custom data stream lifecycle