Skip to content

Commit

Permalink
[TEST] Assert DSL merge policy respects end date (elastic#113038)
Browse files Browse the repository at this point in the history
[TEST] Assert DSL merge policy respects end date

Backing indexes with an end date in the future may still get writes,
so DSL should not apply the merge policy (first configuring the
settings on the index, then doing the force merge) until that time has
passed. The implementation already does this, because
`DataStreamLifecycleService.run()` calls
`timeSeriesIndicesStillWithinTimeBounds` and adds the resulting
indices to `indicesToExcludeForRemainingRun` before calling
`maybeExecuteForceMerge`. This change simply adds a unit test to
ensure that this behaviour does not regress.

Closes elastic#109030
  • Loading branch information
PeteGillinElastic authored Sep 18, 2024
1 parent 63e0897 commit 81041b4
Showing 1 changed file with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.TARGET_MERGE_FACTOR_VALUE;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -326,7 +327,55 @@ public void testRetentionNotExecutedForTSIndicesWithinTimeBounds() {
TransportRequest deleteIndexRequest = clientSeenRequests.get(1);
assertThat(deleteIndexRequest, instanceOf(DeleteIndexRequest.class));
// only the first generation index should be eligible for retention
assertThat(((DeleteIndexRequest) deleteIndexRequest).indices(), is(new String[] { dataStream.getIndices().get(0).getName() }));
assertThat(((DeleteIndexRequest) deleteIndexRequest).indices(), arrayContaining(dataStream.getIndices().getFirst().getName()));
}

public void testMergePolicyNotExecutedForTSIndicesWithinTimeBounds() {
Instant currentTime = Instant.ofEpochMilli(now).truncatedTo(ChronoUnit.MILLIS);
// These ranges are on the edge of each other temporal boundaries.
Instant start1 = currentTime.minus(6, ChronoUnit.HOURS);
Instant end1 = currentTime.minus(4, ChronoUnit.HOURS);
Instant start2 = currentTime.minus(4, ChronoUnit.HOURS);
Instant end2 = currentTime.plus(2, ChronoUnit.HOURS);
Instant start3 = currentTime.plus(2, ChronoUnit.HOURS);
Instant end3 = currentTime.plus(4, ChronoUnit.HOURS);

String dataStreamName = "logs_my-app_prod";
var clusterState = DataStreamTestHelper.getClusterStateWithDataStream(
dataStreamName,
List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2), Tuple.tuple(start3, end3))
);
Metadata.Builder builder = Metadata.builder(clusterState.metadata());
DataStream dataStream = builder.dataStream(dataStreamName);
// Overwrite the data stream in the cluster state to set the lifecycle policy, with no retention policy (i.e. infinite retention).
builder.put(
dataStream.copy()
.setName(dataStreamName)
.setGeneration(dataStream.getGeneration() + 1)
.setLifecycle(DataStreamLifecycle.newBuilder().build())
.build()
);
clusterState = ClusterState.builder(clusterState).metadata(builder).build();

dataStreamLifecycleService.run(clusterState);
// There should be two client requests: one rollover, and one to update the merge policy settings. N.B. The merge policy settings
// will always be updated before the force merge is done, see testMergePolicySettingsAreConfiguredBeforeForcemerge.
assertThat(clientSeenRequests.size(), is(2));
assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
TransportRequest updateSettingsRequest = clientSeenRequests.get(1);
assertThat(updateSettingsRequest, instanceOf(UpdateSettingsRequest.class));
// Only the first generation index should be eligible for merging. The other have end dates in the future.
assertThat(
((UpdateSettingsRequest) updateSettingsRequest).indices(),
arrayContaining(dataStream.getIndices().getFirst().getName())
);
assertThat(
((UpdateSettingsRequest) updateSettingsRequest).settings().keySet(),
containsInAnyOrder(
MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(),
MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey()
)
);
}

public void testRetentionSkippedWhilstDownsamplingInProgress() {
Expand Down

0 comments on commit 81041b4

Please sign in to comment.