Skip to content

Commit

Permalink
[DSL] Refactor effective data retention (#105750)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmarouli authored Mar 5, 2024
1 parent 30828a5 commit d65461d
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testPutLifecycle() throws Exception {
).get();
assertThat(response.getDataStreamLifecycles().size(), equalTo(1));
assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream"));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getEffectiveDataRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getDataStreamRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().isEnabled(), equalTo(true));
}

Expand All @@ -189,7 +189,7 @@ public void testPutLifecycle() throws Exception {
).get();
assertThat(response.getDataStreamLifecycles().size(), equalTo(1));
assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream"));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getEffectiveDataRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getDataStreamRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().isEnabled(), equalTo(false));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testExplainLifecycle() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getEffectiveDataRetention(), nullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());
if (internalCluster().numDataNodes() > 1) {
// If the number of nodes is 1 then the cluster will be yellow so forcemerge will report an error if it has run
assertThat(explainIndex.getError(), nullValue());
Expand Down Expand Up @@ -175,7 +175,7 @@ public void testExplainLifecycle() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getEffectiveDataRetention(), nullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());

if (explainIndex.getIndex().equals(DataStream.getDefaultBackingIndexName(dataStreamName, 1))) {
// first generation index was rolled over
Expand Down Expand Up @@ -243,7 +243,7 @@ public void testExplainLifecycleForIndicesWithErrors() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getEffectiveDataRetention(), nullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());
assertThat(explainIndex.getRolloverDate(), nullValue());
assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue());
// index has not been rolled over yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,38 +822,40 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
* @return The set of indices that delete requests have been sent for
*/
private Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
TimeValue retention = getRetentionConfiguration(dataStream);
Metadata metadata = state.metadata();
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier);
if (backingIndicesOlderThanRetention.isEmpty()) {
return Set.of();
}
Set<Index> indicesToBeRemoved = new HashSet<>();
if (retention != null) {
Metadata metadata = state.metadata();
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier);

for (Index index : backingIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
IndexMetadata backingIndex = metadata.index(index);
assert backingIndex != null : "the data stream backing indices must exist";

IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
// we don't want to delete the source index if they have an in-progress downsampling operation because the
// target downsample index will remain in the system as a standalone index
if (downsampleStatus.equals(UNKNOWN)) {
indicesToBeRemoved.add(index);

// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
String indexName = backingIndex.getIndex().getName();
deleteIndexOnce(indexName, "the lapsed [" + retention + "] retention period");
} else {
// there's an opportunity here to cancel downsampling and delete the source index now
logger.trace(
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
index.getName(),
retention,
downsampleStatus
);
}
// We know that there is lifecycle and retention because there are indices to be deleted
assert dataStream.getLifecycle() != null;
TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention();
for (Index index : backingIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
IndexMetadata backingIndex = metadata.index(index);
assert backingIndex != null : "the data stream backing indices must exist";

IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
// we don't want to delete the source index if they have an in-progress downsampling operation because the
// target downsample index will remain in the system as a standalone index
if (downsampleStatus.equals(UNKNOWN)) {
indicesToBeRemoved.add(index);

// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
String indexName = backingIndex.getIndex().getName();
deleteIndexOnce(indexName, "the lapsed [" + effectiveDataRetention + "] retention period");
} else {
// there's an opportunity here to cancel downsampling and delete the source index now
logger.trace(
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
index.getName(),
effectiveDataRetention,
downsampleStatus
);
}
}
}
Expand Down Expand Up @@ -1222,14 +1224,6 @@ private static boolean isForceMergeComplete(IndexMetadata backingIndex) {
return customMetadata != null && customMetadata.containsKey(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY);
}

@Nullable
static TimeValue getRetentionConfiguration(DataStream dataStream) {
if (dataStream.getLifecycle() == null) {
return null;
}
return dataStream.getLifecycle().getEffectiveDataRetention();
}

/**
* @return the duration of the last run in millis or null if the service hasn't completed a run yet.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void testLifecycleComposition() {
DataStreamLifecycle result = composeDataLifecycles(lifecycles);
// Defaults to true
assertThat(result.isEnabled(), equalTo(true));
assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle.getEffectiveDataRetention()));
assertThat(result.getDataStreamRetention(), equalTo(lifecycle.getDataStreamRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle.getDownsamplingRounds()));
}
// If the last lifecycle is missing a property (apart from enabled) we keep the latest from the previous ones
Expand All @@ -165,7 +165,7 @@ public void testLifecycleComposition() {
List<DataStreamLifecycle> lifecycles = List.of(lifecycle, new DataStreamLifecycle());
DataStreamLifecycle result = composeDataLifecycles(lifecycles);
assertThat(result.isEnabled(), equalTo(true));
assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle.getEffectiveDataRetention()));
assertThat(result.getDataStreamRetention(), equalTo(lifecycle.getDataStreamRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle.getDownsamplingRounds()));
}
// If both lifecycle have all properties, then the latest one overwrites all the others
Expand All @@ -183,7 +183,7 @@ public void testLifecycleComposition() {
List<DataStreamLifecycle> lifecycles = List.of(lifecycle1, lifecycle2);
DataStreamLifecycle result = composeDataLifecycles(lifecycles);
assertThat(result.isEnabled(), equalTo(lifecycle2.isEnabled()));
assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle2.getEffectiveDataRetention()));
assertThat(result.getDataStreamRetention(), equalTo(lifecycle2.getDataStreamRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle2.getDownsamplingRounds()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ public DataStream snapshot(Collection<String> indicesInSnapshot) {
* is treated differently for the write index (i.e. they first need to be rolled over)
*/
public List<Index> getIndicesPastRetention(Function<String, IndexMetadata> indexMetadataSupplier, LongSupplier nowSupplier) {
if (lifecycle == null || lifecycle.getEffectiveDataRetention() == null) {
if (lifecycle == null || lifecycle.isEnabled() == false || lifecycle.getEffectiveDataRetention() == null) {
return List.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public boolean isEnabled() {
*/
@Nullable
public TimeValue getEffectiveDataRetention() {
return getDataStreamRetention();
}

/**
* The least amount of time data the data stream is requesting es to keep the data.
* NOTE: this can be overriden by the {@link DataStreamLifecycle#getEffectiveDataRetention()}.
* @return the time period or null, null represents that data should never be deleted.
*/
@Nullable
public TimeValue getDataStreamRetention() {
return dataRetention == null ? null : dataRetention.value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ public void testGetIndicesPastRetentionWithOriginationDate() {
creationAndRolloverTimes,
settings(IndexVersion.current()),
new DataStreamLifecycle() {
public TimeValue getEffectiveDataRetention() {
public TimeValue getDataStreamRetention() {
return testRetentionReference.get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testIndexTemplateSwapsILMForDataStreamLifecycle() throws Exception {
// let's migrate this data stream to use the custom data stream lifecycle
client().execute(
PutDataStreamLifecycleAction.INSTANCE,
new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getEffectiveDataRetention())
new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getDataStreamRetention())
).actionGet();

assertBusy(() -> {
Expand Down Expand Up @@ -580,7 +580,7 @@ public void testUpdateIndexTemplateToDataStreamLifecyclePreference() throws Exce
// let's migrate this data stream to use the custom data stream lifecycle
client().execute(
PutDataStreamLifecycleAction.INSTANCE,
new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getEffectiveDataRetention())
new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getDataStreamRetention())
).actionGet();

// data stream was rolled over and has 4 indices, 2 managed by ILM, and 2 managed by the custom data stream lifecycle
Expand Down

0 comments on commit d65461d

Please sign in to comment.