Skip to content

Commit

Permalink
Skip backing indices with a disjoint range on @timestamp field. (#85162)
Browse files Browse the repository at this point in the history
Implicitly skip backing indices with a time series range that doesn't
match with a required filter on @timestamp field.

Relates to #74660
  • Loading branch information
martijnvg authored Jul 4, 2022
1 parent 7930d98 commit 577d3e2
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
Expand All @@ -21,12 +22,16 @@
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.xcontent.XContentType;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand All @@ -36,6 +41,18 @@

public class TSDBIndexingIT extends ESSingleNodeTestCase {

public static final String MAPPING_TEMPLATE = """
{
"_doc":{
"properties": {
"metricset": {
"type": "keyword",
"time_series_dimension": true
}
}
}
}""";

private static final String DOC = """
{
"@timestamp": "$time",
Expand Down Expand Up @@ -341,6 +358,76 @@ public void testInvalidTsdbTemplatesMissingSettings() throws Exception {
assertThat(e.getCause().getMessage(), equalTo("[index.routing_path] requires [index.mode=time_series]"));
}

public void testSkippingShards() throws Exception {
Instant time = Instant.now();
{
var templateSettings = Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build();
var request = new PutComposableIndexTemplateAction.Request("id1");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("pattern-1"),
new Template(templateSettings, new CompressedXContent(MAPPING_TEMPLATE), null),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
var indexRequest = new IndexRequest("pattern-1").opType(DocWriteRequest.OpType.CREATE).setRefreshPolicy("true");
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
client().index(indexRequest).actionGet();
}
{
var request = new PutComposableIndexTemplateAction.Request("id2");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("pattern-2"),
new Template(null, new CompressedXContent(MAPPING_TEMPLATE), null),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
var indexRequest = new IndexRequest("pattern-2").opType(DocWriteRequest.OpType.CREATE).setRefreshPolicy("true");
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
client().index(indexRequest).actionGet();
}
{
var matchingRange = new SearchSourceBuilder().query(
new RangeQueryBuilder("@timestamp").from(time.minusSeconds(1).toEpochMilli()).to(time.plusSeconds(1).toEpochMilli())
);
var searchRequest = new SearchRequest("pattern-*");
searchRequest.setPreFilterShardSize(1);
searchRequest.source(matchingRange);
var searchResponse = client().search(searchRequest).actionGet();
ElasticsearchAssertions.assertHitCount(searchResponse, 2);
assertThat(searchResponse.getTotalShards(), equalTo(2));
assertThat(searchResponse.getSkippedShards(), equalTo(0));
assertThat(searchResponse.getSuccessfulShards(), equalTo(2));
}
{
var nonMatchingRange = new SearchSourceBuilder().query(
new RangeQueryBuilder("@timestamp").from(time.minus(2, ChronoUnit.DAYS).toEpochMilli())
.to(time.minus(1, ChronoUnit.DAYS).toEpochMilli())
);
var searchRequest = new SearchRequest("pattern-*");
searchRequest.setPreFilterShardSize(1);
searchRequest.source(nonMatchingRange);
var searchResponse = client().search(searchRequest).actionGet();
ElasticsearchAssertions.assertNoSearchHits(searchResponse);
assertThat(searchResponse.getTotalShards(), equalTo(2));
assertThat(searchResponse.getSkippedShards(), equalTo(1));
assertThat(searchResponse.getSuccessfulShards(), equalTo(2));
}
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.List;

import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class TimestampFieldMapperServiceTests extends ESSingleNodeTestCase {

private static final String DOC = """
{
"@timestamp": "$time",
"metricset": "pod",
"k8s": {
"pod": {
"name": "dog",
"uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9",
"ip": "10.10.55.3",
"network": {
"tx": 1434595272,
"rx": 530605511
}
}
}
}
""";

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(DataStreamsPlugin.class);
}

public void testGetTimestampFieldTypeForTsdbDataStream() throws IOException {
createTemplate(true);
IndexResponse indexResponse = indexDoc();

var indicesService = getInstanceFromNode(IndicesService.class);
var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex());
assertThat(result, notNullValue());
}

public void testGetTimestampFieldTypeForDataStream() throws IOException {
createTemplate(false);
IndexResponse indexResponse = indexDoc();

var indicesService = getInstanceFromNode(IndicesService.class);
var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex());
assertThat(result, nullValue());
}

private IndexResponse indexDoc() {
Instant time = Instant.now();
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
return client().index(indexRequest).actionGet();
}

private void createTemplate(boolean tsdb) throws IOException {
var mappingTemplate = """
{
"_doc":{
"properties": {
"metricset": {
"type": "keyword",
"time_series_dimension": true
}
}
}
}""";
var templateSettings = Settings.builder().put("index.mode", tsdb ? "time_series" : "standard");
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(templateSettings.build(), new CompressedXContent(mappingTemplate), null),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
}

private static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,15 @@ public IndexLongFieldRange getTimestampRange() {
return timestampRange;
}

/**
* @return the time range this index represents if this index is in time series mode.
* Otherwise <code>null</code> is returned.
*/
@Nullable
public IndexLongFieldRange getTimeSeriesTimestampRange() {
return IndexSettings.MODE.get(settings).getConfiguredTimestampRange(this);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -1745,8 +1754,10 @@ public IndexMetadata build() {
}

final boolean isSearchableSnapshot = SearchableSnapshotsSettings.isSearchableSnapshotStore(settings);
final String indexMode = settings.get(IndexSettings.MODE.getKey());
final boolean isTsdb = IndexSettings.isTimeSeriesModeEnabled()
&& IndexMode.TIME_SERIES.getName().equals(settings.get(IndexSettings.MODE.getKey()));
&& indexMode != null
&& IndexMode.TIME_SERIES.getName().equals(indexMode.toLowerCase(Locale.ROOT));
return new IndexMetadata(
new Index(index, uuid),
version,
Expand Down
22 changes: 22 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardLongFieldRange;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -104,6 +106,11 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
public DocumentDimensions buildDocumentDimensions() {
return new DocumentDimensions.OnlySingleValueAllowed();
}

@Override
public IndexLongFieldRange getConfiguredTimestampRange(IndexMetadata indexMetadata) {
return null;
}
},
TIME_SERIES("time_series") {
@Override
Expand Down Expand Up @@ -185,6 +192,13 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {
public DocumentDimensions buildDocumentDimensions() {
return new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder();
}

@Override
public IndexLongFieldRange getConfiguredTimestampRange(IndexMetadata indexMetadata) {
long min = indexMetadata.getTimeSeriesStart().toEpochMilli();
long max = indexMetadata.getTimeSeriesEnd().toEpochMilli();
return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(min, max));
}
};

protected static String tsdbMode() {
Expand Down Expand Up @@ -286,6 +300,14 @@ public String getName() {
*/
public abstract DocumentDimensions buildDocumentDimensions();

/**
* @return the time range based on the provided index settings and index mode implementation.
* Otherwise <code>null</code> is returned.
* @param indexMetadata
*/
@Nullable
public abstract IndexLongFieldRange getConfiguredTimestampRange(IndexMetadata indexMetadata);

public static IndexMode fromString(String value) {
return switch (value) {
case "standard" -> IndexMode.STANDARD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.shard.IndexLongFieldRange;
Expand All @@ -27,21 +26,18 @@
* don't hold queried data. See IndexMetadata#getTimestampRange() for more details
*/
public class CoordinatorRewriteContext extends QueryRewriteContext {
private final Index index;
private IndexLongFieldRange indexLongFieldRange;
private final IndexLongFieldRange indexLongFieldRange;
private final DateFieldMapper.DateFieldType timestampFieldType;

public CoordinatorRewriteContext(
XContentParserConfiguration parserConfig,
NamedWriteableRegistry writeableRegistry,
Client client,
LongSupplier nowInMillis,
Index index,
IndexLongFieldRange indexLongFieldRange,
DateFieldMapper.DateFieldType timestampFieldType
) {
super(parserConfig, writeableRegistry, client, nowInMillis);
this.index = index;
this.indexLongFieldRange = indexLongFieldRange;
this.timestampFieldType = timestampFieldType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -48,20 +47,26 @@ public CoordinatorRewriteContextProvider(

@Nullable
public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {
ClusterState clusterState = clusterStateSupplier.get();
IndexMetadata indexMetadata = clusterState.metadata().index(index);
var clusterState = clusterStateSupplier.get();
var indexMetadata = clusterState.metadata().index(index);

if (indexMetadata == null || indexMetadata.getTimestampRange().containsAllShardRanges() == false) {
if (indexMetadata == null) {
return null;
}
IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
if (timestampRange.containsAllShardRanges() == false) {
timestampRange = indexMetadata.getTimeSeriesTimestampRange();
if (timestampRange == null) {
return null;
}
}

DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);

if (dateFieldType == null) {
return null;
}

IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
return new CoordinatorRewriteContext(parserConfig, writeableRegistry, client, nowInMillis, index, timestampRange, dateFieldType);
return new CoordinatorRewriteContext(parserConfig, writeableRegistry, client, nowInMillis, timestampRange, dateFieldType);
}
}
Loading

0 comments on commit 577d3e2

Please sign in to comment.