From 577d3e2e9c302ecbe0196732da7d89af6dd3a187 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 4 Jul 2022 11:38:46 +0200 Subject: [PATCH] Skip backing indices with a disjoint range on @timestamp field. (#85162) Implicitly skip backing indices with a time series range that doesn't match with a required filter on @timestamp field. Relates to #74660 --- .../datastreams/TSDBIndexingIT.java | 87 ++++++++++ .../TimestampFieldMapperServiceTests.java | 117 ++++++++++++++ .../cluster/metadata/IndexMetadata.java | 13 +- .../org/elasticsearch/index/IndexMode.java | 22 +++ .../query/CoordinatorRewriteContext.java | 6 +- .../CoordinatorRewriteContextProvider.java | 17 +- .../indices/TimestampFieldMapperService.java | 6 + .../CanMatchPreFilterSearchPhaseTests.java | 149 ++++++++++++++++-- 8 files changed, 388 insertions(+), 29 deletions(-) create mode 100644 modules/data-streams/src/test/java/org/elasticsearch/datastreams/TimestampFieldMapperServiceTests.java diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java index 434db543e54e0..1e46dc00143d6 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.Template; @@ -21,12 +22,16 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.FormatNames; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.indices.InvalidIndexTemplateException; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.xcontent.XContentType; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -36,6 +41,18 @@ public class TSDBIndexingIT extends ESSingleNodeTestCase { + public static final String MAPPING_TEMPLATE = """ + { + "_doc":{ + "properties": { + "metricset": { + "type": "keyword", + "time_series_dimension": true + } + } + } + }"""; + private static final String DOC = """ { "@timestamp": "$time", @@ -341,6 +358,76 @@ public void testInvalidTsdbTemplatesMissingSettings() throws Exception { assertThat(e.getCause().getMessage(), equalTo("[index.routing_path] requires [index.mode=time_series]")); } + public void testSkippingShards() throws Exception { + Instant time = Instant.now(); + { + var templateSettings = Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build(); + var request = new PutComposableIndexTemplateAction.Request("id1"); + request.indexTemplate( + new ComposableIndexTemplate( + List.of("pattern-1"), + new Template(templateSettings, new CompressedXContent(MAPPING_TEMPLATE), null), + null, + null, + null, + null, + new ComposableIndexTemplate.DataStreamTemplate(false, false), + null + ) + ); + client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet(); + var indexRequest = new IndexRequest("pattern-1").opType(DocWriteRequest.OpType.CREATE).setRefreshPolicy("true"); + indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON); + client().index(indexRequest).actionGet(); + } + { + var request = new PutComposableIndexTemplateAction.Request("id2"); + request.indexTemplate( + new ComposableIndexTemplate( + List.of("pattern-2"), + new Template(null, new CompressedXContent(MAPPING_TEMPLATE), null), + null, + null, + null, + null, + new ComposableIndexTemplate.DataStreamTemplate(false, false), + null + ) + ); + client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet(); + var indexRequest = new IndexRequest("pattern-2").opType(DocWriteRequest.OpType.CREATE).setRefreshPolicy("true"); + indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON); + client().index(indexRequest).actionGet(); + } + { + var matchingRange = new SearchSourceBuilder().query( + new RangeQueryBuilder("@timestamp").from(time.minusSeconds(1).toEpochMilli()).to(time.plusSeconds(1).toEpochMilli()) + ); + var searchRequest = new SearchRequest("pattern-*"); + searchRequest.setPreFilterShardSize(1); + searchRequest.source(matchingRange); + var searchResponse = client().search(searchRequest).actionGet(); + ElasticsearchAssertions.assertHitCount(searchResponse, 2); + assertThat(searchResponse.getTotalShards(), equalTo(2)); + assertThat(searchResponse.getSkippedShards(), equalTo(0)); + assertThat(searchResponse.getSuccessfulShards(), equalTo(2)); + } + { + var nonMatchingRange = new SearchSourceBuilder().query( + new RangeQueryBuilder("@timestamp").from(time.minus(2, ChronoUnit.DAYS).toEpochMilli()) + .to(time.minus(1, ChronoUnit.DAYS).toEpochMilli()) + ); + var searchRequest = new SearchRequest("pattern-*"); + searchRequest.setPreFilterShardSize(1); + searchRequest.source(nonMatchingRange); + var searchResponse = client().search(searchRequest).actionGet(); + ElasticsearchAssertions.assertNoSearchHits(searchResponse); + assertThat(searchResponse.getTotalShards(), equalTo(2)); + assertThat(searchResponse.getSkippedShards(), equalTo(1)); + assertThat(searchResponse.getSuccessfulShards(), equalTo(2)); + } + } + static String formatInstant(Instant instant) { return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/TimestampFieldMapperServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/TimestampFieldMapperServiceTests.java new file mode 100644 index 0000000000000..8617106d5cc28 --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/TimestampFieldMapperServiceTests.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collection; +import java.util.List; + +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class TimestampFieldMapperServiceTests extends ESSingleNodeTestCase { + + private static final String DOC = """ + { + "@timestamp": "$time", + "metricset": "pod", + "k8s": { + "pod": { + "name": "dog", + "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", + "ip": "10.10.55.3", + "network": { + "tx": 1434595272, + "rx": 530605511 + } + } + } + } + """; + + @Override + protected Collection> getPlugins() { + return List.of(DataStreamsPlugin.class); + } + + public void testGetTimestampFieldTypeForTsdbDataStream() throws IOException { + createTemplate(true); + IndexResponse indexResponse = indexDoc(); + + var indicesService = getInstanceFromNode(IndicesService.class); + var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex()); + assertThat(result, notNullValue()); + } + + public void testGetTimestampFieldTypeForDataStream() throws IOException { + createTemplate(false); + IndexResponse indexResponse = indexDoc(); + + var indicesService = getInstanceFromNode(IndicesService.class); + var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex()); + assertThat(result, nullValue()); + } + + private IndexResponse indexDoc() { + Instant time = Instant.now(); + var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE); + indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON); + return client().index(indexRequest).actionGet(); + } + + private void createTemplate(boolean tsdb) throws IOException { + var mappingTemplate = """ + { + "_doc":{ + "properties": { + "metricset": { + "type": "keyword", + "time_series_dimension": true + } + } + } + }"""; + var templateSettings = Settings.builder().put("index.mode", tsdb ? "time_series" : "standard"); + var request = new PutComposableIndexTemplateAction.Request("id"); + request.indexTemplate( + new ComposableIndexTemplate( + List.of("k8s*"), + new Template(templateSettings.build(), new CompressedXContent(mappingTemplate), null), + null, + null, + null, + null, + new ComposableIndexTemplate.DataStreamTemplate(false, false), + null + ) + ); + client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet(); + } + + private static String formatInstant(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 919a2f42df53d..cd794aa4151f1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -1000,6 +1000,15 @@ public IndexLongFieldRange getTimestampRange() { return timestampRange; } + /** + * @return the time range this index represents if this index is in time series mode. + * Otherwise null is returned. + */ + @Nullable + public IndexLongFieldRange getTimeSeriesTimestampRange() { + return IndexSettings.MODE.get(settings).getConfiguredTimestampRange(this); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -1745,8 +1754,10 @@ public IndexMetadata build() { } final boolean isSearchableSnapshot = SearchableSnapshotsSettings.isSearchableSnapshotStore(settings); + final String indexMode = settings.get(IndexSettings.MODE.getKey()); final boolean isTsdb = IndexSettings.isTimeSeriesModeEnabled() - && IndexMode.TIME_SERIES.getName().equals(settings.get(IndexSettings.MODE.getKey())); + && indexMode != null + && IndexMode.TIME_SERIES.getName().equals(indexMode.toLowerCase(Locale.ROOT)); return new IndexMetadata( new Index(index, uuid), version, diff --git a/server/src/main/java/org/elasticsearch/index/IndexMode.java b/server/src/main/java/org/elasticsearch/index/IndexMode.java index c149f214212d5..71eee97ebfd00 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexMode.java +++ b/server/src/main/java/org/elasticsearch/index/IndexMode.java @@ -27,6 +27,8 @@ import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.index.shard.ShardLongFieldRange; import java.io.IOException; import java.util.Arrays; @@ -104,6 +106,11 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) { public DocumentDimensions buildDocumentDimensions() { return new DocumentDimensions.OnlySingleValueAllowed(); } + + @Override + public IndexLongFieldRange getConfiguredTimestampRange(IndexMetadata indexMetadata) { + return null; + } }, TIME_SERIES("time_series") { @Override @@ -185,6 +192,13 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) { public DocumentDimensions buildDocumentDimensions() { return new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(); } + + @Override + public IndexLongFieldRange getConfiguredTimestampRange(IndexMetadata indexMetadata) { + long min = indexMetadata.getTimeSeriesStart().toEpochMilli(); + long max = indexMetadata.getTimeSeriesEnd().toEpochMilli(); + return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(min, max)); + } }; protected static String tsdbMode() { @@ -286,6 +300,14 @@ public String getName() { */ public abstract DocumentDimensions buildDocumentDimensions(); + /** + * @return the time range based on the provided index settings and index mode implementation. + * Otherwise null is returned. + * @param indexMetadata + */ + @Nullable + public abstract IndexLongFieldRange getConfiguredTimestampRange(IndexMetadata indexMetadata); + public static IndexMode fromString(String value) { return switch (value) { case "standard" -> IndexMode.STANDARD; diff --git a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java index 7c12000210d1d..181dc50eb1c96 100644 --- a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java +++ b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java @@ -11,7 +11,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.core.Nullable; -import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.shard.IndexLongFieldRange; @@ -27,8 +26,7 @@ * don't hold queried data. See IndexMetadata#getTimestampRange() for more details */ public class CoordinatorRewriteContext extends QueryRewriteContext { - private final Index index; - private IndexLongFieldRange indexLongFieldRange; + private final IndexLongFieldRange indexLongFieldRange; private final DateFieldMapper.DateFieldType timestampFieldType; public CoordinatorRewriteContext( @@ -36,12 +34,10 @@ public CoordinatorRewriteContext( NamedWriteableRegistry writeableRegistry, Client client, LongSupplier nowInMillis, - Index index, IndexLongFieldRange indexLongFieldRange, DateFieldMapper.DateFieldType timestampFieldType ) { super(parserConfig, writeableRegistry, client, nowInMillis); - this.index = index; this.indexLongFieldRange = indexLongFieldRange; this.timestampFieldType = timestampFieldType; } diff --git a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java index 2068b7fee2405..cb7d7b63bdd16 100644 --- a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java +++ b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java @@ -10,7 +10,6 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; @@ -48,12 +47,19 @@ public CoordinatorRewriteContextProvider( @Nullable public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) { - ClusterState clusterState = clusterStateSupplier.get(); - IndexMetadata indexMetadata = clusterState.metadata().index(index); + var clusterState = clusterStateSupplier.get(); + var indexMetadata = clusterState.metadata().index(index); - if (indexMetadata == null || indexMetadata.getTimestampRange().containsAllShardRanges() == false) { + if (indexMetadata == null) { return null; } + IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange(); + if (timestampRange.containsAllShardRanges() == false) { + timestampRange = indexMetadata.getTimeSeriesTimestampRange(); + if (timestampRange == null) { + return null; + } + } DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index); @@ -61,7 +67,6 @@ public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) { return null; } - IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange(); - return new CoordinatorRewriteContext(parserConfig, writeableRegistry, client, nowInMillis, index, timestampRange, dateFieldType); + return new CoordinatorRewriteContext(parserConfig, writeableRegistry, client, nowInMillis, timestampRange, dateFieldType); } } diff --git a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java index dd895cf358825..85ff54472f7a1 100644 --- a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java +++ b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java @@ -142,6 +142,12 @@ private static boolean hasUsefulTimestampField(IndexMetadata indexMetadata) { if (indexMetadata == null) { return false; } + + if (indexMetadata.getTimeSeriesTimestampRange() != null) { + // Tsdb indices have @timestamp field and index.time_series.start_time / index.time_series.end_time range + return true; + } + final IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange(); return timestampRange.isComplete() && timestampRange != IndexLongFieldRange.UNKNOWN; } diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 62341b2d57f1e..64aad03f948fa 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -26,6 +26,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder; @@ -503,7 +505,7 @@ public void testCanMatchFilteringOnCoordinatorThatCanBeSkipped() throws Exceptio } assignShardsAndExecuteCanMatchPhase( - dataStream, + List.of(dataStream), regularIndices, contextProviderBuilder.build(), queryBuilder, @@ -570,7 +572,7 @@ public void testCanMatchFilteringOnCoordinatorParsingFails() throws Exception { } assignShardsAndExecuteCanMatchPhase( - dataStream, + List.of(dataStream), regularIndices, contextProviderBuilder.build(), queryBuilder, @@ -622,7 +624,7 @@ public void testCanMatchFilteringOnCoordinatorThatCanNotBeSkipped() throws Excep } assignShardsAndExecuteCanMatchPhase( - dataStream, + List.of(dataStream), regularIndices, contextProviderBuilder.build(), queryBuilder, @@ -630,6 +632,76 @@ public void testCanMatchFilteringOnCoordinatorThatCanNotBeSkipped() throws Excep ); } + public void testCanMatchFilteringOnCoordinatorThatCanBeSkippedTsdb() throws Exception { + DataStream dataStream1; + { + Index index1 = new Index(".ds-ds10001", UUIDs.base64UUID()); + Index index2 = new Index(".ds-ds10002", UUIDs.base64UUID()); + dataStream1 = DataStreamTestHelper.newInstance("ds1", List.of(index1, index2)); + } + DataStream dataStream2; + { + Index index1 = new Index(".ds-ds20001", UUIDs.base64UUID()); + Index index2 = new Index(".ds-ds20002", UUIDs.base64UUID()); + dataStream2 = DataStreamTestHelper.newInstance("ds2", List.of(index1, index2)); + } + + long indexMinTimestamp = randomLongBetween(0, 5000); + long indexMaxTimestamp = randomLongBetween(indexMinTimestamp, 5000 * 2); + StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder(); + for (Index index : dataStream1.getIndices()) { + contextProviderBuilder.addIndexMinMaxTimestamps(index, indexMinTimestamp, indexMaxTimestamp); + } + for (Index index : dataStream2.getIndices()) { + contextProviderBuilder.addIndex(index); + } + + RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder("@timestamp"); + // We query a range outside of the timestamp range covered by both datastream indices + rangeQueryBuilder.from(indexMaxTimestamp + 1).to(indexMaxTimestamp + 2); + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder().filter(rangeQueryBuilder); + + if (randomBoolean()) { + // Add an additional filter that cannot be evaluated in the coordinator but shouldn't + // affect the end result as we're filtering + queryBuilder.filter(new TermQueryBuilder("fake", "value")); + } + + assignShardsAndExecuteCanMatchPhase( + List.of(dataStream1, dataStream2), + List.of(), + contextProviderBuilder.build(), + queryBuilder, + (updatedSearchShardIterators, requests) -> { + var skippedShards = updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).toList(); + var nonSkippedShards = updatedSearchShardIterators.stream() + .filter(searchShardIterator -> searchShardIterator.skip() == false) + .toList(); + + boolean allSkippedShardAreFromDataStream1 = skippedShards.stream() + .allMatch(shardIterator -> dataStream1.getIndices().contains(shardIterator.shardId().getIndex())); + assertThat(allSkippedShardAreFromDataStream1, equalTo(true)); + boolean allNonSkippedShardAreFromDataStream1 = nonSkippedShards.stream() + .noneMatch(shardIterator -> dataStream1.getIndices().contains(shardIterator.shardId().getIndex())); + assertThat(allNonSkippedShardAreFromDataStream1, equalTo(true)); + boolean allRequestMadeToDataStream1 = requests.stream() + .allMatch(request -> dataStream1.getIndices().contains(request.shardId().getIndex())); + assertThat(allRequestMadeToDataStream1, equalTo(false)); + + boolean allSkippedShardAreFromDataStream2 = skippedShards.stream() + .allMatch(shardIterator -> dataStream2.getIndices().contains(shardIterator.shardId().getIndex())); + assertThat(allSkippedShardAreFromDataStream2, equalTo(false)); + boolean allNonSkippedShardAreFromDataStream2 = nonSkippedShards.stream() + .noneMatch(shardIterator -> dataStream2.getIndices().contains(shardIterator.shardId().getIndex())); + assertThat(allNonSkippedShardAreFromDataStream2, equalTo(false)); + boolean allRequestMadeToDataStream2 = requests.stream() + .allMatch(request -> dataStream2.getIndices().contains(request.shardId().getIndex())); + assertThat(allRequestMadeToDataStream2, equalTo(true)); + } + ); + } + private void assertAllShardsAreQueried(List updatedSearchShardIterators, List requests) { int skippedShards = (int) updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).count(); @@ -646,7 +718,7 @@ private void assertAllShardsAreQueried(List updatedSearchSh } private > void assignShardsAndExecuteCanMatchPhase( - DataStream dataStream, + List dataStreams, List regularIndices, CoordinatorRewriteContextProvider contextProvider, AbstractQueryBuilder query, @@ -659,7 +731,9 @@ private > void assignShardsAndExecuteCanMatc lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); List indicesToSearch = new ArrayList<>(); - indicesToSearch.add(dataStream.getName()); + for (DataStream dataStream : dataStreams) { + indicesToSearch.add(dataStream.getName()); + } for (Index regularIndex : regularIndices) { indicesToSearch.add(regularIndex.getName()); } @@ -667,17 +741,19 @@ private > void assignShardsAndExecuteCanMatc String[] indices = indicesToSearch.toArray(new String[0]); OriginalIndices originalIndices = new OriginalIndices(indices, SearchRequest.DEFAULT_INDICES_OPTIONS); - boolean atLeastOnePrimaryAssigned = false; final List originalShardIters = new ArrayList<>(); - for (Index dataStreamIndex : dataStream.getIndices()) { - // If we have to execute the can match request against all the shards - // and none is assigned, the phase is considered as failed meaning that the next phase won't be executed - boolean withAssignedPrimaries = randomBoolean() || atLeastOnePrimaryAssigned == false; - int numShards = randomIntBetween(1, 6); - originalShardIters.addAll( - getShardsIter(dataStreamIndex, originalIndices, numShards, false, withAssignedPrimaries ? primaryNode : null, null) - ); - atLeastOnePrimaryAssigned |= withAssignedPrimaries; + for (var dataStream : dataStreams) { + boolean atLeastOnePrimaryAssigned = false; + for (var dataStreamIndex : dataStream.getIndices()) { + // If we have to execute the can match request against all the shards + // and none is assigned, the phase is considered as failed meaning that the next phase won't be executed + boolean withAssignedPrimaries = randomBoolean() || atLeastOnePrimaryAssigned == false; + int numShards = randomIntBetween(1, 6); + originalShardIters.addAll( + getShardsIter(dataStreamIndex, originalIndices, numShards, false, withAssignedPrimaries ? primaryNode : null, null) + ); + atLeastOnePrimaryAssigned |= withAssignedPrimaries; + } } for (Index regularIndex : regularIndices) { @@ -706,8 +782,10 @@ private > void assignShardsAndExecuteCanMatc } Map aliasFilters = new HashMap<>(); - for (Index dataStreamIndex : dataStream.getIndices()) { - aliasFilters.put(dataStreamIndex.getUUID(), aliasFilter); + for (var dataStream : dataStreams) { + for (var dataStreamIndex : dataStream.getIndices()) { + aliasFilters.put(dataStreamIndex.getUUID(), aliasFilter); + } } for (Index regularIndex : regularIndices) { @@ -806,6 +884,43 @@ private void addIndexMinMaxTimestamps(Index index, String fieldName, long minTim fields.put(index, new DateFieldMapper.DateFieldType(fieldName)); } + private void addIndexMinMaxTimestamps(Index index, long minTimestamp, long maxTimestamp) { + if (clusterState.metadata().index(index) != null) { + throw new IllegalArgumentException("Min/Max timestamps for " + index + " were already defined"); + } + + Settings.Builder indexSettings = settings(Version.CURRENT).put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "a_field") + .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(minTimestamp)) + .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(maxTimestamp)); + + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(index.getName()) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(0); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()).put(indexMetadataBuilder); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + fields.put(index, new DateFieldMapper.DateFieldType("@timestamp")); + } + + private void addIndex(Index index) { + if (clusterState.metadata().index(index) != null) { + throw new IllegalArgumentException("Min/Max timestamps for " + index + " were already defined"); + } + + Settings.Builder indexSettings = settings(Version.CURRENT).put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(index.getName()) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(0); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()).put(indexMetadataBuilder); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + fields.put(index, new DateFieldMapper.DateFieldType("@timestamp")); + } + public CoordinatorRewriteContextProvider build() { return new CoordinatorRewriteContextProvider( XContentParserConfiguration.EMPTY,