From 3e3ee42589ea16cb44bb4eb4be7b3e5189336da3 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 8 Sep 2023 11:11:43 +0200 Subject: [PATCH] Add index.look_back_time setting for tsdb data streams (#98518) This change adds a `index.look_back_time` index setting that sets the `index.time_series.start_time` setting for the first backing index when a data stream is created. This allows accepting data that is older for initial indexing without changing the `index.look_ahead_time` setting. This setting also controls the `index.time_series.end_time` setting and would affect rollovers as well. The default for the `index.look_back_time` is `2h`, which means documents with `@timestamp` up to 2 hours after creation of the data stream are allowed to be indexed. This is the same as is without this change, because `index.look_ahead_time` is used to set `index.time_series.start_time` of the first backing index. Closes #98463 --- docs/changelog/98518.yaml | 6 ++ .../data-streams/set-up-tsds.asciidoc | 1 + .../data-streams/tsds-index-settings.asciidoc | 9 +++ docs/reference/data-streams/tsds.asciidoc | 16 +++++ .../datastreams/TsdbDataStreamRestIT.java | 65 +++++++++++++++++++ .../DataStreamIndexSettingsProvider.java | 3 +- .../datastreams/DataStreamsPlugin.java | 10 ++- .../DataStreamIndexSettingsProviderTests.java | 28 +++++++- .../datastreams/LookAHeadTimeTests.java | 19 ++++-- 9 files changed, 148 insertions(+), 9 deletions(-) create mode 100644 docs/changelog/98518.yaml diff --git a/docs/changelog/98518.yaml b/docs/changelog/98518.yaml new file mode 100644 index 0000000000000..2f961fc11ce69 --- /dev/null +++ b/docs/changelog/98518.yaml @@ -0,0 +1,6 @@ +pr: 98518 +summary: Add `index.look_back_time` setting for tsdb data streams +area: TSDB +type: enhancement +issues: + - 98463 diff --git a/docs/reference/data-streams/set-up-tsds.asciidoc b/docs/reference/data-streams/set-up-tsds.asciidoc index 3c15011871f89..a98e3c7302424 100644 --- a/docs/reference/data-streams/set-up-tsds.asciidoc +++ b/docs/reference/data-streams/set-up-tsds.asciidoc @@ -177,6 +177,7 @@ Optionally, the index settings component template for a TSDS can include: * Your lifecycle policy in the `index.lifecycle.name` index setting. * The <> index setting. +* The <> index setting. * Other index settings, such as <>, for your TSDS's backing indices. diff --git a/docs/reference/data-streams/tsds-index-settings.asciidoc b/docs/reference/data-streams/tsds-index-settings.asciidoc index fa5c9b8cd821f..8091163ffe883 100644 --- a/docs/reference/data-streams/tsds-index-settings.asciidoc +++ b/docs/reference/data-streams/tsds-index-settings.asciidoc @@ -33,6 +33,15 @@ days). Only indices with an `index.mode` of `time_series` support this setting. For more information, refer to <>. Additionally this setting can not be less than `time_series.poll_interval` cluster setting. +[[index-look-back-time]] +`index.look_back_time`:: +(<<_static_index_settings,Static>>, <>) +Interval used to calculate the `index.time_series.start_time` for a TSDS's first +backing index when a tsdb data stream is created. Defaults to `2h` (2 hours). +Accepts `1m` (one minute) to `7d` (seven days). Only indices with an `index.mode` +of `time_series` support this setting. For more information, +refer to <>. + [[index-routing-path]] `index.routing_path`:: (<<_static_index_settings,Static>>, string or array of strings) Plain `keyword` fields used to route documents in a TSDS to index shards. Supports wildcards diff --git a/docs/reference/data-streams/tsds.asciidoc b/docs/reference/data-streams/tsds.asciidoc index 3f49a7ab8c700..d6e9ea08f0892 100644 --- a/docs/reference/data-streams/tsds.asciidoc +++ b/docs/reference/data-streams/tsds.asciidoc @@ -253,6 +253,22 @@ value borders the `index.time_series.start_time` for the new write index. This ensures the `@timestamp` ranges for neighboring backing indices always border but never overlap. +[discrete] +[[tsds-look-back-time]] +==== Look-back time + +Use the <> index setting to +configure how far in the past you can add documents to an index. When you +create a data stream for a TSDS, {es} calculates the index's +`index.time_series.start_time` value as: + +`now - index.look_back_time` + +This setting is only used when a data stream gets created and controls +the `index.time_series.start_time` index setting of the first backing index. +Configuring this index setting can be useful to accept documents with `@timestamp` +field values that are older than 2 hours (the `index.look_back_time` default). + [discrete] [[tsds-accepted-time-range]] ==== Accepted time range for adding data diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java index 1f27c43f2f2f4..2a4b6f0c5a5ee 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java @@ -10,7 +10,9 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.test.rest.ObjectPath; import org.junit.Before; @@ -618,6 +620,69 @@ public void testUpdateComponentTemplateDoesNotFailIndexTemplateValidation() thro client().performRequest(request); } + public void testLookBackTime() throws IOException { + // Create template that uses index.look_back_time index setting: + String template = """ + { + "index_patterns": ["test*"], + "template": { + "settings":{ + "index": { + "look_back_time": "24h", + "number_of_replicas": 0, + "mode": "time_series" + } + }, + "mappings":{ + "properties": { + "@timestamp" : { + "type": "date" + }, + "field": { + "type": "keyword", + "time_series_dimension": true + } + } + } + }, + "data_stream": {} + }"""; + var putIndexTemplateRequest = new Request("PUT", "/_index_template/2"); + putIndexTemplateRequest.setJsonEntity(template); + assertOK(client().performRequest(putIndexTemplateRequest)); + + // Create data stream: + var createDataStreamRequest = new Request("PUT", "/_data_stream/test123"); + assertOK(client().performRequest(createDataStreamRequest)); + + // Check data stream has been created: + var getDataStreamsRequest = new Request("GET", "/_data_stream"); + var response = client().performRequest(getDataStreamsRequest); + assertOK(response); + var dataStreams = entityAsMap(response); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("test123")); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("2")); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1)); + String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name"); + assertThat(firstBackingIndex, backingIndexEqualTo("test123", 1)); + + // Check the backing index: + // 2023-08-15T04:35:50.000Z + var indices = getIndex(firstBackingIndex); + var escapedBackingIndex = firstBackingIndex.replace(".", "\\."); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("test123")); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series")); + String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"); + assertThat(startTimeFirstBackingIndex, notNullValue()); + Instant now = Instant.now(); + Instant startTime = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(startTimeFirstBackingIndex)).toInstant(); + assertTrue(now.minus(24, ChronoUnit.HOURS).isAfter(startTime)); + String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"); + assertThat(endTimeFirstBackingIndex, notNullValue()); + } + private static Map getIndex(String indexName) throws IOException { var getIndexRequest = new Request("GET", "/" + indexName + "?human"); var response = client().performRequest(getIndexRequest); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java index f4e660c2b18f4..064030ed2b6d5 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java @@ -84,10 +84,11 @@ public Settings getAdditionalIndexSettings( if (indexMode == IndexMode.TIME_SERIES) { Settings.Builder builder = Settings.builder(); TimeValue lookAheadTime = DataStreamsPlugin.LOOK_AHEAD_TIME.get(allSettings); + TimeValue lookBackTime = DataStreamsPlugin.LOOK_BACK_TIME.get(allSettings); final Instant start; final Instant end; if (dataStream == null || migrating) { - start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookAheadTime.getMillis())); + start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookBackTime.getMillis())); end = DataStream.getCanonicalTimestampBound(resolvedAt.plusMillis(lookAheadTime.getMillis())); } else { IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex()); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 3f8b7e40eeb43..cd221ada7a4dc 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -105,7 +105,14 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin { Setting.Property.Dynamic ); public static final String LIFECYCLE_CUSTOM_INDEX_METADATA_KEY = "data_stream_lifecycle"; - + public static final Setting LOOK_BACK_TIME = Setting.timeSetting( + "index.look_back_time", + TimeValue.timeValueHours(2), + TimeValue.timeValueMinutes(1), + TimeValue.timeValueDays(7), + Setting.Property.IndexScope, + Setting.Property.Dynamic + ); // The dependency of index.look_ahead_time is a cluster setting and currently there is no clean validation approach for this: private final SetOnce updateTimeSeriesRangeService = new SetOnce<>(); private final SetOnce errorStoreInitialisationService = new SetOnce<>(); @@ -141,6 +148,7 @@ public List> getSettings() { List> pluginSettings = new ArrayList<>(); pluginSettings.add(TIME_SERIES_POLL_INTERVAL); pluginSettings.add(LOOK_AHEAD_TIME); + pluginSettings.add(LOOK_BACK_TIME); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING); diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java index 27fe65ba309d3..23a86b657b82d 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java @@ -38,6 +38,7 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase { + private static final TimeValue DEFAULT_LOOK_BACK_TIME = TimeValue.timeValueHours(2); // default private static final TimeValue DEFAULT_LOOK_AHEAD_TIME = TimeValue.timeValueHours(2); // default DataStreamIndexSettingsProvider provider; @@ -83,7 +84,7 @@ public void testGetAdditionalIndexSettings() throws Exception { List.of(new CompressedXContent(mapping)) ); assertThat(result.size(), equalTo(3)); - assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis()))); + assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis()))); assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis()))); assertThat(IndexMetadata.INDEX_ROUTING_PATH.get(result), contains("field3")); } @@ -235,10 +236,31 @@ public void testGetAdditionalIndexSettingsLookAheadTime() throws Exception { List.of(new CompressedXContent("{}")) ); assertThat(result.size(), equalTo(2)); - assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookAheadTime.getMillis()))); + assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis()))); assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(lookAheadTime.getMillis()))); } + public void testGetAdditionalIndexSettingsLookBackTime() throws Exception { + Metadata metadata = Metadata.EMPTY_METADATA; + String dataStreamName = "logs-app1"; + + Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); + TimeValue lookBackTime = TimeValue.timeValueHours(12); + Settings settings = builder().put("index.mode", "time_series").put("index.look_back_time", lookBackTime.getStringRep()).build(); + Settings result = provider.getAdditionalIndexSettings( + DataStream.getDefaultBackingIndexName(dataStreamName, 1), + dataStreamName, + true, + metadata, + now, + settings, + List.of(new CompressedXContent("{}")) + ); + assertThat(result.size(), equalTo(2)); + assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookBackTime.getMillis()))); + assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis()))); + } + public void testGetAdditionalIndexSettingsDataStreamAlreadyCreated() throws Exception { String dataStreamName = "logs-app1"; TimeValue lookAheadTime = TimeValue.timeValueHours(2); @@ -358,7 +380,7 @@ public void testGetAdditionalIndexSettingsMigrateToTsdb() { List.of() ); assertThat(result.size(), equalTo(2)); - assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis()))); + assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis()))); assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis()))); } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java index a0ed1a83d0de1..a612587262463 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.junit.After; -import java.io.IOException; import java.util.Collection; import java.util.List; @@ -52,7 +51,7 @@ public void testTimeSeriesPollIntervalSettingToHigh() { assertThat(e.getMessage(), equalTo("failed to parse value [11m] for setting [time_series.poll_interval], must be <= [10m]")); } - public void testLookAheadTimeSetting() throws IOException { + public void testLookAheadTimeSetting() { var settings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "10m").build(); updateIndexSettings(settings); } @@ -69,6 +68,18 @@ public void testLookAheadTimeSettingToHigh() { assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_ahead_time], must be <= [7d]")); } + public void testLookBackTimeSettingToLow() { + var settings = Settings.builder().put(DataStreamsPlugin.LOOK_BACK_TIME.getKey(), "1s").build(); + var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings)); + assertThat(e.getMessage(), equalTo("failed to parse value [1s] for setting [index.look_back_time], must be >= [1m]")); + } + + public void testLookBackTimeSettingToHigh() { + var settings = Settings.builder().put(DataStreamsPlugin.LOOK_BACK_TIME.getKey(), "8d").build(); + var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings)); + assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_back_time], must be <= [7d]")); + } + public void testLookAheadTimeSettingLowerThanTimeSeriesPollIntervalSetting() { { var settings = Settings.builder() @@ -99,7 +110,7 @@ public void testLookAheadTimeSettingLowerThanTimeSeriesPollIntervalSetting() { } } - public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() throws IOException { + public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() { var clusterSettings = Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "10m").build(); updateClusterSettings(clusterSettings); var indexSettings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "100m").build(); @@ -110,7 +121,7 @@ private void updateClusterSettings(Settings settings) { clusterAdmin().updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(settings)).actionGet(); } - private void updateIndexSettings(Settings settings) throws IOException { + private void updateIndexSettings(Settings settings) { try { createIndex("test"); } catch (ResourceAlreadyExistsException e) {