Skip to content

Commit

Permalink
Search coordinator uses event.ingested in cluster state to do rewrites (
Browse files Browse the repository at this point in the history
elastic#110352) (elastic#110782)

Min/max range for the event.ingested timestamp field (part of Elastic Common
Schema) was added to IndexMetadata in cluster state for searchable snapshots
in elastic#106252.

This commit modifies the search coordinator to rewrite searches to MatchNone
if the query searches a range of event.ingested that, from the min/max range
in cluster state, is known to not overlap. This is the same behavior we currently
have for the @timestamp field.
  • Loading branch information
quux00 authored Jul 11, 2024
1 parent 0ee975b commit 9092394
Show file tree
Hide file tree
Showing 12 changed files with 1,034 additions and 173 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/110352.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 110352
summary: Search coordinator uses `event.ingested` in cluster state to do rewrites
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testGetTimestampFieldTypeForTsdbDataStream() throws IOException {
DocWriteResponse indexResponse = indexDoc();

var indicesService = getInstanceFromNode(IndicesService.class);
var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex());
var result = indicesService.getTimestampFieldTypeInfo(indexResponse.getShardId().getIndex());
assertThat(result, notNullValue());
}

Expand All @@ -70,7 +70,7 @@ public void testGetTimestampFieldTypeForDataStream() throws IOException {
DocWriteResponse indexResponse = indexDoc();

var indicesService = getInstanceFromNode(IndicesService.class);
var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex());
var result = indicesService.getTimestampFieldTypeInfo(indexResponse.getShardId().getIndex());
assertThat(result, nullValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
package org.elasticsearch.index.query;

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.indices.DateFieldRangeInfo;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.util.Collections;
Expand All @@ -23,19 +26,24 @@
* Context object used to rewrite {@link QueryBuilder} instances into simplified version in the coordinator.
* Instances of this object rely on information stored in the {@code IndexMetadata} for certain indices.
* Right now this context object is able to rewrite range queries that include a known timestamp field
* (i.e. the timestamp field for DataStreams) into a MatchNoneQueryBuilder and skip the shards that
* don't hold queried data. See IndexMetadata#getTimestampRange() for more details
* (i.e. the timestamp field for DataStreams or the 'event.ingested' field in ECS) into a MatchNoneQueryBuilder
* and skip the shards that don't hold queried data. See IndexMetadata for more details.
*/
public class CoordinatorRewriteContext extends QueryRewriteContext {
private final IndexLongFieldRange indexLongFieldRange;
private final DateFieldMapper.DateFieldType timestampFieldType;
private final DateFieldRangeInfo dateFieldRangeInfo;

/**
* Context for coordinator search rewrites based on time ranges for the @timestamp field and/or 'event.ingested' field
* @param parserConfig
* @param client
* @param nowInMillis
* @param dateFieldRangeInfo range and field type info for @timestamp and 'event.ingested'
*/
public CoordinatorRewriteContext(
XContentParserConfiguration parserConfig,
Client client,
LongSupplier nowInMillis,
IndexLongFieldRange indexLongFieldRange,
DateFieldMapper.DateFieldType timestampFieldType
DateFieldRangeInfo dateFieldRangeInfo
) {
super(
parserConfig,
Expand All @@ -53,29 +61,98 @@ public CoordinatorRewriteContext(
null,
null
);
this.indexLongFieldRange = indexLongFieldRange;
this.timestampFieldType = timestampFieldType;
this.dateFieldRangeInfo = dateFieldRangeInfo;
}

long getMinTimestamp() {
return indexLongFieldRange.getMin();
/**
* Get min timestamp for either '@timestamp' or 'event.ingested' fields. Any other field
* passed in will cause an {@link IllegalArgumentException} to be thrown, as these are the only
* two fields supported for coordinator rewrites (based on time range).
* @param fieldName Must be DataStream.TIMESTAMP_FIELD_NAME or IndexMetadata.EVENT_INGESTED_FIELD_NAME
* @return min timestamp for the field from IndexMetadata in cluster state.
*/
long getMinTimestamp(String fieldName) {
if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getTimestampRange().getMin();
} else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getEventIngestedRange().getMin();
} else {
throw new IllegalArgumentException(
Strings.format(
"Only [%s] or [%s] fields are supported for min timestamp coordinator rewrites, but got: [%s]",
DataStream.TIMESTAMP_FIELD_NAME,
IndexMetadata.EVENT_INGESTED_FIELD_NAME,
fieldName
)
);
}
}

long getMaxTimestamp() {
return indexLongFieldRange.getMax();
/**
* Get max timestamp for either '@timestamp' or 'event.ingested' fields. Any other field
* passed in will cause an {@link IllegalArgumentException} to be thrown, as these are the only
* two fields supported for coordinator rewrites (based on time range).
* @param fieldName Must be DataStream.TIMESTAMP_FIELD_NAME or IndexMetadata.EVENT_INGESTED_FIELD_NAME
* @return max timestamp for the field from IndexMetadata in cluster state.
*/
long getMaxTimestamp(String fieldName) {
if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getTimestampRange().getMax();
} else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getEventIngestedRange().getMax();
} else {
throw new IllegalArgumentException(
Strings.format(
"Only [%s] or [%s] fields are supported for max timestamp coordinator rewrites, but got: [%s]",
DataStream.TIMESTAMP_FIELD_NAME,
IndexMetadata.EVENT_INGESTED_FIELD_NAME,
fieldName
)
);
}
}

boolean hasTimestampData() {
return indexLongFieldRange.isComplete() && indexLongFieldRange != IndexLongFieldRange.EMPTY;
/**
* Determine whether either '@timestamp' or 'event.ingested' fields has useful timestamp ranges
* stored in cluster state for this context.
* Any other fieldname will cause an {@link IllegalArgumentException} to be thrown, as these are the only
* two fields supported for coordinator rewrites (based on time range).
* @param fieldName Must be DataStream.TIMESTAMP_FIELD_NAME or IndexMetadata.EVENT_INGESTED_FIELD_NAME
* @return min timestamp for the field from IndexMetadata in cluster state.
*/
boolean hasTimestampData(String fieldName) {
if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getTimestampRange().isComplete()
&& dateFieldRangeInfo.getTimestampRange() != IndexLongFieldRange.EMPTY;
} else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getEventIngestedRange().isComplete()
&& dateFieldRangeInfo.getEventIngestedRange() != IndexLongFieldRange.EMPTY;
} else {
throw new IllegalArgumentException(
Strings.format(
"Only [%s] or [%s] fields are supported for min/max timestamp coordinator rewrites, but got: [%s]",
DataStream.TIMESTAMP_FIELD_NAME,
IndexMetadata.EVENT_INGESTED_FIELD_NAME,
fieldName
)
);
}
}

/**
* @param fieldName Get MappedFieldType for either '@timestamp' or 'event.ingested' fields.
* @return min timestamp for the field from IndexMetadata in cluster state or null if fieldName was not
* DataStream.TIMESTAMP_FIELD_NAME or IndexMetadata.EVENT_INGESTED_FIELD_NAME.
*/
@Nullable
public MappedFieldType getFieldType(String fieldName) {
if (fieldName.equals(timestampFieldType.name()) == false) {
if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getTimestampFieldType();
} else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getEventIngestedFieldType();
} else {
return null;
}

return timestampFieldType;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.indices.DateFieldRangeInfo;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.util.function.Function;
Expand All @@ -25,14 +26,14 @@ public class CoordinatorRewriteContextProvider {
private final Client client;
private final LongSupplier nowInMillis;
private final Supplier<ClusterState> clusterStateSupplier;
private final Function<Index, DateFieldMapper.DateFieldType> mappingSupplier;
private final Function<Index, DateFieldRangeInfo> mappingSupplier;

public CoordinatorRewriteContextProvider(
XContentParserConfiguration parserConfig,
Client client,
LongSupplier nowInMillis,
Supplier<ClusterState> clusterStateSupplier,
Function<Index, DateFieldMapper.DateFieldType> mappingSupplier
Function<Index, DateFieldRangeInfo> mappingSupplier
) {
this.parserConfig = parserConfig;
this.client = client;
Expand All @@ -49,18 +50,33 @@ public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {
if (indexMetadata == null) {
return null;
}
DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);
if (dateFieldType == null) {

DateFieldRangeInfo dateFieldRangeInfo = mappingSupplier.apply(index);
if (dateFieldRangeInfo == null) {
return null;
}

DateFieldMapper.DateFieldType timestampFieldType = dateFieldRangeInfo.getTimestampFieldType();
DateFieldMapper.DateFieldType eventIngestedFieldType = dateFieldRangeInfo.getEventIngestedFieldType();
IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
IndexLongFieldRange eventIngestedRange = indexMetadata.getEventIngestedRange();

if (timestampRange.containsAllShardRanges() == false) {
timestampRange = indexMetadata.getTimeSeriesTimestampRange(dateFieldType);
if (timestampRange == null) {
// if @timestamp range is not present or not ready in cluster state, fallback to using time series range (if present)
timestampRange = indexMetadata.getTimeSeriesTimestampRange(timestampFieldType);
// if timestampRange in the time series is null AND the eventIngestedRange is not ready for use, return null (no coord rewrite)
if (timestampRange == null && eventIngestedRange.containsAllShardRanges() == false) {
return null;
}
}

return new CoordinatorRewriteContext(parserConfig, client, nowInMillis, timestampRange, dateFieldType);
// the DateFieldRangeInfo from the mappingSupplier only has field types, but not ranges
// so create a new object with ranges pulled from cluster state
return new CoordinatorRewriteContext(
parserConfig,
client,
nowInMillis,
new DateFieldRangeInfo(timestampFieldType, timestampRange, eventIngestedFieldType, eventIngestedRange)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,11 @@ public String getWriteableName() {
protected MappedFieldType.Relation getRelation(final CoordinatorRewriteContext coordinatorRewriteContext) {
final MappedFieldType fieldType = coordinatorRewriteContext.getFieldType(fieldName);
if (fieldType instanceof final DateFieldMapper.DateFieldType dateFieldType) {
if (coordinatorRewriteContext.hasTimestampData() == false) {
if (coordinatorRewriteContext.hasTimestampData(fieldName) == false) {
return MappedFieldType.Relation.DISJOINT;
}
long minTimestamp = coordinatorRewriteContext.getMinTimestamp();
long maxTimestamp = coordinatorRewriteContext.getMaxTimestamp();
long minTimestamp = coordinatorRewriteContext.getMinTimestamp(fieldName);
long maxTimestamp = coordinatorRewriteContext.getMaxTimestamp(fieldName);
DateMathParser dateMathParser = getForceDateParser();
return dateFieldType.isFieldWithinQuery(
minTimestamp,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.indices;

import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.shard.IndexLongFieldRange;

/**
* Data holder of timestamp fields held in cluster state IndexMetadata.
*/
public final class DateFieldRangeInfo {

private final DateFieldMapper.DateFieldType timestampFieldType;
private final IndexLongFieldRange timestampRange;
private final DateFieldMapper.DateFieldType eventIngestedFieldType;
private final IndexLongFieldRange eventIngestedRange;

public DateFieldRangeInfo(
DateFieldMapper.DateFieldType timestampFieldType,
IndexLongFieldRange timestampRange,
DateFieldMapper.DateFieldType eventIngestedFieldType,
IndexLongFieldRange eventIngestedRange
) {
this.timestampFieldType = timestampFieldType;
this.timestampRange = timestampRange;
this.eventIngestedFieldType = eventIngestedFieldType;
this.eventIngestedRange = eventIngestedRange;
}

public DateFieldMapper.DateFieldType getTimestampFieldType() {
return timestampFieldType;
}

public IndexLongFieldRange getTimestampRange() {
return timestampRange;
}

public DateFieldMapper.DateFieldType getEventIngestedFieldType() {
return eventIngestedFieldType;
}

public IndexLongFieldRange getEventIngestedRange() {
return eventIngestedRange;
}
}
19 changes: 13 additions & 6 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.MapperRegistry;
Expand Down Expand Up @@ -1764,7 +1763,13 @@ public DataRewriteContext getDataRewriteContext(LongSupplier nowInMillis) {
}

public CoordinatorRewriteContextProvider getCoordinatorRewriteContextProvider(LongSupplier nowInMillis) {
return new CoordinatorRewriteContextProvider(parserConfig, client, nowInMillis, clusterService::state, this::getTimestampFieldType);
return new CoordinatorRewriteContextProvider(
parserConfig,
client,
nowInMillis,
clusterService::state,
this::getTimestampFieldTypeInfo
);
}

/**
Expand Down Expand Up @@ -1854,14 +1859,16 @@ public boolean allPendingDanglingIndicesWritten() {
}

/**
* @return the field type of the {@code @timestamp} field of the given index, or {@code null} if:
* @return DateFieldRangeInfo holding the field types of the {@code @timestamp} and {@code event.ingested} fields of the index.
* or {@code null} if:
* - the index is not found,
* - the field is not found, or
* - the field is not a timestamp field.
* - the mapping is not known yet, or
* - the index does not have a useful timestamp field.
*/
@Nullable
public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
return timestampFieldMapperService.getTimestampFieldType(index);
public DateFieldRangeInfo getTimestampFieldTypeInfo(Index index) {
return timestampFieldMapperService.getTimestampFieldTypeMap(index);
}

public IndexScopedSettings getIndexScopedSettings() {
Expand Down
Loading

0 comments on commit 9092394

Please sign in to comment.