Skip to content

Commit

Permalink
Introduce downsampling configuration for data stream lifecycle (#97041)
Browse files Browse the repository at this point in the history
This PR introduces downsampling configuration to the data stream lifecycle. Keep in mind downsampling implementation will come in a follow up PR. Configuration looks like this:
```
{
  "lifecycle": {
    "data_retention": "90d",
    "downsampling": [
      {
        "after": "1d",
        "fixed_interval": "2h"
      },
      { "after": "15d", "fixed_interval": "1d" },
      { "after": "30d", "fixed_interval": "1w" }
    ]
  }
}
```
We will also support using `null` to unset downsampling configuration during template composition:
```
{
  "lifecycle": {
    "data_retention": "90d",
    "downsampling": null
  }
}
```
  • Loading branch information
gmarouli authored Jun 29, 2023
1 parent 69ff7df commit f87c2c7
Show file tree
Hide file tree
Showing 26 changed files with 564 additions and 121 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/97041.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 97041
summary: Introduce downsampling configuration for data stream lifecycle
area: Data streams
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public <T> void declareFieldArray(
*/
public abstract void declareExclusiveFieldSet(String... exclusiveSet);

private static <T, Context> List<T> parseArray(XContentParser parser, Context context, ContextParser<Context, T> itemParser)
public static <T, Context> List<T> parseArray(XContentParser parser, Context context, ContextParser<Context, T> itemParser)
throws IOException {
final XContentParser.Token currentToken = parser.currentToken();
if (currentToken.isValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.datastreams;

import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
Expand All @@ -20,14 +21,18 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettingProviders;
import org.elasticsearch.indices.EmptySystemIndices;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.indices.ShardLimitValidator;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ESSingleNodeTestCase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -151,34 +156,33 @@ public void testLifecycleComposition() {
}
// One lifecycle results to this lifecycle as the final
{
DataLifecycle lifecycle = switch (randomInt(2)) {
case 0 -> new DataLifecycle();
case 1 -> new DataLifecycle(DataLifecycle.Retention.NULL);
default -> new DataLifecycle(randomMillisUpToYear9999());
};
DataLifecycle lifecycle = new DataLifecycle(randomRetention(), randomDownsampling());
List<DataLifecycle> lifecycles = List.of(lifecycle);
assertThat(composeDataLifecycles(lifecycles), equalTo(lifecycle));
DataLifecycle result = composeDataLifecycles(lifecycles);
assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle.getEffectiveDataRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle.getDownsamplingRounds()));
}
// If the last lifecycle is missing a property we keep the latest from the previous ones
{
DataLifecycle lifecycleWithRetention = new DataLifecycle(randomMillisUpToYear9999());
List<DataLifecycle> lifecycles = List.of(lifecycleWithRetention, new DataLifecycle());
assertThat(
composeDataLifecycles(lifecycles).getEffectiveDataRetention(),
equalTo(lifecycleWithRetention.getEffectiveDataRetention())
);
DataLifecycle lifecycle = new DataLifecycle(randomNonEmptyRetention(), randomNonEmptyDownsampling());
List<DataLifecycle> lifecycles = List.of(lifecycle, new DataLifecycle());
DataLifecycle result = composeDataLifecycles(lifecycles);
assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle.getEffectiveDataRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle.getDownsamplingRounds()));
}
// If both lifecycle have all properties, then the latest one overwrites all the others
{
DataLifecycle lifecycle1 = new DataLifecycle(randomMillisUpToYear9999());
DataLifecycle lifecycle2 = new DataLifecycle(randomMillisUpToYear9999());
DataLifecycle lifecycle1 = new DataLifecycle(randomNonEmptyRetention(), randomNonEmptyDownsampling());
DataLifecycle lifecycle2 = new DataLifecycle(randomNonEmptyRetention(), randomNonEmptyDownsampling());
List<DataLifecycle> lifecycles = List.of(lifecycle1, lifecycle2);
assertThat(composeDataLifecycles(lifecycles), equalTo(lifecycle2));
DataLifecycle result = composeDataLifecycles(lifecycles);
assertThat(result.getEffectiveDataRetention(), equalTo(lifecycle2.getEffectiveDataRetention()));
assertThat(result.getDownsamplingRounds(), equalTo(lifecycle2.getDownsamplingRounds()));
}
// If the last lifecycle is explicitly null, the result is also null
{
DataLifecycle lifecycle1 = new DataLifecycle(randomMillisUpToYear9999());
DataLifecycle lifecycle2 = new DataLifecycle(randomMillisUpToYear9999());
DataLifecycle lifecycle1 = new DataLifecycle(randomNonEmptyRetention(), randomNonEmptyDownsampling());
DataLifecycle lifecycle2 = new DataLifecycle(randomNonEmptyRetention(), randomNonEmptyDownsampling());
List<DataLifecycle> lifecycles = List.of(lifecycle1, lifecycle2, Template.NO_LIFECYCLE);
assertThat(composeDataLifecycles(lifecycles), nullValue());
}
Expand Down Expand Up @@ -224,4 +228,49 @@ public static ShardLimitValidator createTestShardLimitService(int maxShardsPerNo
return new ShardLimitValidator(limitOnlySettings, clusterService);
}

@Nullable
private static DataLifecycle.Retention randomRetention() {
return switch (randomInt(2)) {
case 0 -> null;
case 1 -> DataLifecycle.Retention.NULL;
default -> randomNonEmptyRetention();
};
}

private static DataLifecycle.Retention randomNonEmptyRetention() {
return new DataLifecycle.Retention(TimeValue.timeValueMillis(randomMillisUpToYear9999()));
}

@Nullable
private static DataLifecycle.Downsampling randomDownsampling() {
return switch (randomInt(2)) {
case 0 -> null;
case 1 -> DataLifecycle.Downsampling.NULL;
default -> randomNonEmptyDownsampling();
};
}

private static DataLifecycle.Downsampling randomNonEmptyDownsampling() {
var count = randomIntBetween(0, 10);
List<DataLifecycle.Downsampling.Round> rounds = new ArrayList<>();
var previous = new DataLifecycle.Downsampling.Round(
TimeValue.timeValueDays(randomIntBetween(1, 365)),
new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h"))
);
rounds.add(previous);
for (int i = 0; i < count; i++) {
DataLifecycle.Downsampling.Round round = nextRound(previous);
rounds.add(round);
previous = round;
}
return new DataLifecycle.Downsampling(rounds);
}

private static DataLifecycle.Downsampling.Round nextRound(DataLifecycle.Downsampling.Round previous) {
var after = TimeValue.timeValueDays(previous.after().days() + randomIntBetween(1, 10));
var fixedInterval = new DownsampleConfig(
new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms")
);
return new DataLifecycle.Downsampling.Round(after, fixedInterval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;

import static org.elasticsearch.dlm.DLMFixtures.putComposableIndexTemplate;
import static org.elasticsearch.dlm.DLMFixtures.randomDataLifecycle;
import static org.elasticsearch.dlm.DLMFixtures.randomLifecycle;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand All @@ -42,7 +42,7 @@ protected boolean ignoreExternalCluster() {
}

public void testGetLifecycle() throws Exception {
DataLifecycle lifecycle = randomDataLifecycle();
DataLifecycle lifecycle = randomLifecycle();
putComposableIndexTemplate("id1", null, List.of("with-lifecycle*"), null, null, lifecycle);
putComposableIndexTemplate("id2", null, List.of("without-lifecycle*"), null, null, null);
{
Expand Down
51 changes: 45 additions & 6 deletions modules/dlm/src/test/java/org/elasticsearch/dlm/DLMFixtures.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.cluster.metadata.DataStream;
Expand All @@ -22,16 +23,19 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.lucene.tests.util.LuceneTestCase.rarely;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
import static org.elasticsearch.test.ESIntegTestCase.client;
import static org.elasticsearch.test.ESTestCase.randomInt;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.elasticsearch.test.ESTestCase.randomMillisUpToYear9999;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -93,12 +97,47 @@ static void putComposableIndexTemplate(
assertTrue(client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet().isAcknowledged());
}

static DataLifecycle randomDataLifecycle() {
return switch (randomInt(3)) {
case 0 -> new DataLifecycle();
case 1 -> new DataLifecycle(DataLifecycle.Retention.NULL);
case 2 -> Template.NO_LIFECYCLE;
default -> new DataLifecycle(TimeValue.timeValueDays(randomIntBetween(1, 365)));
static DataLifecycle randomLifecycle() {
return rarely() ? Template.NO_LIFECYCLE : new DataLifecycle(randomRetention(), randomDownsampling());
}

@Nullable
private static DataLifecycle.Retention randomRetention() {
return switch (randomInt(2)) {
case 0 -> null;
case 1 -> DataLifecycle.Retention.NULL;
default -> new DataLifecycle.Retention(TimeValue.timeValueMillis(randomMillisUpToYear9999()));
};
}

@Nullable
private static DataLifecycle.Downsampling randomDownsampling() {
return switch (randomInt(2)) {
case 0 -> null;
case 1 -> DataLifecycle.Downsampling.NULL;
default -> {
var count = randomIntBetween(0, 10);
List<DataLifecycle.Downsampling.Round> rounds = new ArrayList<>();
var previous = new DataLifecycle.Downsampling.Round(
TimeValue.timeValueDays(randomIntBetween(1, 365)),
new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h"))
);
rounds.add(previous);
for (int i = 0; i < count; i++) {
DataLifecycle.Downsampling.Round round = nextRound(previous);
rounds.add(round);
previous = round;
}
yield new DataLifecycle.Downsampling(rounds);
}
};
}

private static DataLifecycle.Downsampling.Round nextRound(DataLifecycle.Downsampling.Round previous) {
var after = TimeValue.timeValueDays(previous.after().days() + randomIntBetween(1, 10));
var fixedInterval = new DownsampleConfig(
new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms")
);
return new DataLifecycle.Downsampling.Round(after, fixedInterval);
}
}
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@
opens org.elasticsearch.common.logging to org.apache.logging.log4j.core;

exports org.elasticsearch.action.dlm;
exports org.elasticsearch.action.downsample;

provides java.util.spi.CalendarDataProvider with org.elasticsearch.common.time.IsoCalendarDataProvider;
provides org.elasticsearch.xcontent.ErrorOnUnknown with org.elasticsearch.common.xcontent.SuggestingErrorOnUnknown;
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/elasticsearch/TransportVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
public static final TransportVersion V_8_500_023 = registerTransportVersion(8_500_023, "01b06435-5d73-42ff-a121-3b36b771375e");
public static final TransportVersion V_8_500_024 = registerTransportVersion(8_500_024, "db337007-f823-4dbd-968e-375383814c17");
public static final TransportVersion V_8_500_025 = registerTransportVersion(8_500_025, "b2ab7b75-5ac2-4a3b-bbb6-8789ca66722d");
public static final TransportVersion V_8_500_026 = registerTransportVersion(8_500_026, "965d294b-14aa-4abb-bcfc-34631187941d");

private static class CurrentHolder {
private static final TransportVersion CURRENT = findCurrent(V_8_500_025);
private static final TransportVersion CURRENT = findCurrent(V_8_500_026);

// finds the pluggable current version, or uses the given fallback
private static TransportVersion findCurrent(TransportVersion fallback) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.xpack.core.downsample;

package org.elasticsearch.action.downsample;

import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.Strings;
Expand All @@ -29,7 +31,7 @@
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;

/**
* This class holds the configuration details of a {@link DownsampleAction} that downsamples time series
* This class holds the configuration details of a DownsampleAction that downsamples time series
* (TSDB) indices. We have made great effort to simplify the rollup configuration and currently
* only requires a fixed time interval. So, it has the following format:
*
Expand Down Expand Up @@ -97,6 +99,37 @@ public DownsampleConfig(final StreamInput in) throws IOException {
fixedInterval = new DateHistogramInterval(in);
}

/**
* This method validates the target downsampling configuration can be applied on an index that has been
* already downsampled from the source configuration. The requirements are:
* - The target interval needs to be greater than source interval
* - The target interval needs to be a multiple of the source interval
* throws an IllegalArgumentException to signal that the target interval is not acceptable
*/
public static void validateSourceAndTargetIntervals(DownsampleConfig source, DownsampleConfig target) {
long sourceMillis = source.fixedInterval.estimateMillis();
long targetMillis = target.fixedInterval.estimateMillis();
if (sourceMillis >= targetMillis) {
// Downsampling interval must be greater than source interval
throw new IllegalArgumentException(
"Downsampling interval ["
+ target.fixedInterval
+ "] must be greater than the source index interval ["
+ source.fixedInterval
+ "]."
);
} else if (targetMillis % sourceMillis != 0) {
// Downsampling interval must be a multiple of the source interval
throw new IllegalArgumentException(
"Downsampling interval ["
+ target.fixedInterval
+ "] must be a multiple of the source index interval ["
+ source.fixedInterval
+ "]."
);
}
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
fixedInterval.writeTo(out);
Expand Down Expand Up @@ -154,11 +187,15 @@ public String getWriteableName() {
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
builder.field(FIXED_INTERVAL, fixedInterval.toString());
toXContentFragment(builder);
}
return builder.endObject();
}

public XContentBuilder toXContentFragment(final XContentBuilder builder) throws IOException {
return builder.field(FIXED_INTERVAL, fixedInterval.toString());
}

public static DownsampleConfig fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
Expand Down
Loading

0 comments on commit f87c2c7

Please sign in to comment.