Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DSL] Refactor effective data retention #105750

Merged
merged 7 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testPutLifecycle() throws Exception {
).get();
assertThat(response.getDataStreamLifecycles().size(), equalTo(1));
assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream"));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getEffectiveDataRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getDataStreamRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().isEnabled(), equalTo(true));
}

Expand All @@ -189,7 +189,7 @@ public void testPutLifecycle() throws Exception {
).get();
assertThat(response.getDataStreamLifecycles().size(), equalTo(1));
assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("my-data-stream"));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getEffectiveDataRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().getDataStreamRetention(), equalTo(dataRetention));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle().isEnabled(), equalTo(false));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testExplainLifecycle() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getEffectiveDataRetention(), nullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());
if (internalCluster().numDataNodes() > 1) {
// If the number of nodes is 1 then the cluster will be yellow so forcemerge will report an error if it has run
assertThat(explainIndex.getError(), nullValue());
Expand Down Expand Up @@ -175,7 +175,7 @@ public void testExplainLifecycle() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getEffectiveDataRetention(), nullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());

if (explainIndex.getIndex().equals(DataStream.getDefaultBackingIndexName(dataStreamName, 1))) {
// first generation index was rolled over
Expand Down Expand Up @@ -243,7 +243,7 @@ public void testExplainLifecycleForIndicesWithErrors() throws Exception {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(explainIndex.getLifecycle().getEffectiveDataRetention(), nullValue());
assertThat(explainIndex.getLifecycle().getDataStreamRetention(), nullValue());
assertThat(explainIndex.getRolloverDate(), nullValue());
assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue());
// index has not been rolled over yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,38 +822,40 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
* @return The set of indices that delete requests have been sent for
*/
private Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
TimeValue retention = getRetentionConfiguration(dataStream);
Metadata metadata = state.metadata();
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier);
if (backingIndicesOlderThanRetention.isEmpty()) {
return Set.of();
}
Set<Index> indicesToBeRemoved = new HashSet<>();
if (retention != null) {
Metadata metadata = state.metadata();
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier);

for (Index index : backingIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
IndexMetadata backingIndex = metadata.index(index);
assert backingIndex != null : "the data stream backing indices must exist";

IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
// we don't want to delete the source index if they have an in-progress downsampling operation because the
// target downsample index will remain in the system as a standalone index
if (downsampleStatus.equals(UNKNOWN)) {
indicesToBeRemoved.add(index);

// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
String indexName = backingIndex.getIndex().getName();
deleteIndexOnce(indexName, "the lapsed [" + retention + "] retention period");
} else {
// there's an opportunity here to cancel downsampling and delete the source index now
logger.trace(
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
index.getName(),
retention,
downsampleStatus
);
}
// We know that there is lifecycle and retention because there are indices to be deleted
assert dataStream.getLifecycle() != null;
TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention();
for (Index index : backingIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
IndexMetadata backingIndex = metadata.index(index);
assert backingIndex != null : "the data stream backing indices must exist";

IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
// we don't want to delete the source index if they have an in-progress downsampling operation because the
// target downsample index will remain in the system as a standalone index
if (downsampleStatus.equals(UNKNOWN)) {
indicesToBeRemoved.add(index);

// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
String indexName = backingIndex.getIndex().getName();
deleteIndexOnce(indexName, "the lapsed [" + effectiveDataRetention + "] retention period");
} else {
// there's an opportunity here to cancel downsampling and delete the source index now
logger.trace(
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
index.getName(),
effectiveDataRetention,
downsampleStatus
);
}
}
}
Expand Down Expand Up @@ -1222,14 +1224,6 @@ private static boolean isForceMergeComplete(IndexMetadata backingIndex) {
return customMetadata != null && customMetadata.containsKey(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY);
}

@Nullable
static TimeValue getRetentionConfiguration(DataStream dataStream) {
if (dataStream.getLifecycle() == null) {
return null;
}
return dataStream.getLifecycle().getEffectiveDataRetention();
}

/**
* @return the duration of the last run in millis or null if the service hasn't completed a run yet.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void testLifecycleComposition() {
DataStreamLifecycle result = composeDataLifecycles(lifecycles);
// Defaults to true
assertThat(result.isEnabled(), equalTo(true));
assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle.getEffectiveDataRetention()));
assertThat(result.getDataStreamRetention(), equalTo(lifecycle.getDataStreamRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle.getDownsamplingRounds()));
}
// If the last lifecycle is missing a property (apart from enabled) we keep the latest from the previous ones
Expand All @@ -165,7 +165,7 @@ public void testLifecycleComposition() {
List<DataStreamLifecycle> lifecycles = List.of(lifecycle, new DataStreamLifecycle());
DataStreamLifecycle result = composeDataLifecycles(lifecycles);
assertThat(result.isEnabled(), equalTo(true));
assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle.getEffectiveDataRetention()));
assertThat(result.getDataStreamRetention(), equalTo(lifecycle.getDataStreamRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle.getDownsamplingRounds()));
}
// If both lifecycle have all properties, then the latest one overwrites all the others
Expand All @@ -183,7 +183,7 @@ public void testLifecycleComposition() {
List<DataStreamLifecycle> lifecycles = List.of(lifecycle1, lifecycle2);
DataStreamLifecycle result = composeDataLifecycles(lifecycles);
assertThat(result.isEnabled(), equalTo(lifecycle2.isEnabled()));
assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle2.getEffectiveDataRetention()));
assertThat(result.getDataStreamRetention(), equalTo(lifecycle2.getDataStreamRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle2.getDownsamplingRounds()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ public DataStream snapshot(Collection<String> indicesInSnapshot) {
* is treated differently for the write index (i.e. they first need to be rolled over)
*/
public List<Index> getIndicesPastRetention(Function<String, IndexMetadata> indexMetadataSupplier, LongSupplier nowSupplier) {
if (lifecycle == null || lifecycle.getEffectiveDataRetention() == null) {
if (lifecycle == null || lifecycle.isEnabled() == false || lifecycle.getEffectiveDataRetention() == null) {
return List.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public boolean isEnabled() {
*/
@Nullable
public TimeValue getEffectiveDataRetention() {
return getDataStreamRetention();
}

/**
* The least amount of time data the data stream is requesting es to keep the data.
* NOTE: this can be overriden by the {@link DataStreamLifecycle#getEffectiveDataRetention()}.
* @return the time period or null, null represents that data should never be deleted.
*/
@Nullable
public TimeValue getDataStreamRetention() {
return dataRetention == null ? null : dataRetention.value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ public void testGetIndicesPastRetentionWithOriginationDate() {
creationAndRolloverTimes,
settings(IndexVersion.current()),
new DataStreamLifecycle() {
public TimeValue getEffectiveDataRetention() {
public TimeValue getDataStreamRetention() {
return testRetentionReference.get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testIndexTemplateSwapsILMForDataStreamLifecycle() throws Exception {
// let's migrate this data stream to use the custom data stream lifecycle
client().execute(
PutDataStreamLifecycleAction.INSTANCE,
new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getEffectiveDataRetention())
new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getDataStreamRetention())
).actionGet();

assertBusy(() -> {
Expand Down Expand Up @@ -580,7 +580,7 @@ public void testUpdateIndexTemplateToDataStreamLifecyclePreference() throws Exce
// let's migrate this data stream to use the custom data stream lifecycle
client().execute(
PutDataStreamLifecycleAction.INSTANCE,
new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getEffectiveDataRetention())
new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, customLifecycle.getDataStreamRetention())
).actionGet();

// data stream was rolled over and has 4 indices, 2 managed by ILM, and 2 managed by the custom data stream lifecycle
Expand Down