From 26caaa4ae196e1c2ef47dfd4095b2ed6bbf19ea0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 15 Dec 2020 13:07:29 +0100 Subject: [PATCH] [7.x] Apply can match phase on coordinator when the min max field data is available at the coordinator. (#66319) This commit introduces an optimization that allows skipping shards that are not necessary directly on the coordinator for time based indices. This is possible for frozen and searchable snapshots since those store their min/max timestamp range in their IndexMetadata (introduced in #65689). For indices that don't have that information available, the behaviour is the same as it used to be. Backport of #65583 --- .../search/AbstractSearchAsyncAction.java | 12 +- .../search/CanMatchPreFilterSearchPhase.java | 54 +- .../action/search/TransportSearchAction.java | 4 +- .../index/mapper/DateFieldMapper.java | 22 +- .../query/CoordinatorRewriteContext.java | 83 +++ .../CoordinatorRewriteContextProvider.java | 83 +++ .../index/query/QueryRewriteContext.java | 4 + .../index/query/RangeQueryBuilder.java | 17 + .../index/shard/IndexLongFieldRange.java | 7 + .../elasticsearch/index/shard/IndexShard.java | 9 +- .../elasticsearch/indices/IndicesService.java | 10 + .../elasticsearch/search/SearchService.java | 37 +- .../CanMatchPreFilterSearchPhaseTests.java | 400 +++++++++++++- .../action/search/SearchAsyncActionTests.java | 103 +++- .../index/engine/FrozenIndexTests.java | 14 +- ...pshotsCanMatchOnCoordinatorIntegTests.java | 507 ++++++++++++++++++ .../SearchableSnapshotsIntegTests.java | 13 +- 17 files changed, 1316 insertions(+), 63 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java create mode 100644 server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java create mode 100644 x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 18fa9968eeec4..46cd5693e3981 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -237,7 +237,7 @@ void skipShard(SearchShardIterator iterator) { successfulShardExecution(iterator); } - private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { + protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { /* * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the * same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we @@ -306,11 +306,11 @@ protected abstract void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, SearchActionListener listener); - private void fork(final Runnable runnable) { + protected void fork(final Runnable runnable) { executor.execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - + assert false : "Unexpected failure"; } @Override @@ -529,7 +529,11 @@ private void onShardResultConsumed(Result result, SearchShardIterator shardIt) { private void successfulShardExecution(SearchShardIterator shardsIt) { final int remainingOpsOnIterator; if (shardsIt.skip()) { - remainingOpsOnIterator = shardsIt.remaining(); + // It's possible that we're skipping a shard that's unavailable + // but its range was available in the IndexMetadata, in that + // case the shardsIt.remaining() would be 0, expectedTotalOps + // accounts for unavailable shards too. + remainingOpsOnIterator = Math.max(shardsIt.remaining(), 1); } else { remainingOpsOnIterator = shardsIt.remaining() + 1; } diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index a1dfc602fb916..cbc9007d6c3d4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -24,10 +24,14 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.query.CoordinatorRewriteContext; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchService.CanMatchResponse; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.MinAndMax; import org.elasticsearch.search.sort.SortOrder; @@ -58,6 +62,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction, SearchPhase> phaseFactory; private final GroupShardsIterator shardsIts; + private final CoordinatorRewriteContextProvider coordinatorRewriteContextProvider; CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService, BiFunction nodeIdToConnection, @@ -66,13 +71,14 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState, SearchTask task, Function, SearchPhase> phaseFactory, - SearchResponse.Clusters clusters) { + SearchResponse.Clusters clusters, CoordinatorRewriteContextProvider coordinatorRewriteContextProvider) { //We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterState, task, new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters); this.phaseFactory = phaseFactory; this.shardsIts = shardsIts; + this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider; } @Override @@ -100,7 +106,17 @@ private GroupShardsIterator getIterator(CanMatchSearchPhase if (cardinality == 0) { // this is a special case where we have no hit but we need to get at least one search response in order // to produce a valid search result with all the aggs etc. - possibleMatches.set(0); + // Since it's possible that some of the shards that we're skipping are + // unavailable, we would try to query the node that at least has some + // shards available in order to produce a valid search result. + int shardIndexToQuery = 0; + for (int i = 0; i < shardsIts.size(); i++) { + if (shardsIts.get(i).size() > 0) { + shardIndexToQuery = i; + break; + } + } + possibleMatches.set(shardIndexToQuery); } SearchSourceBuilder source = getRequest().source(); int i = 0; @@ -118,6 +134,40 @@ private GroupShardsIterator getIterator(CanMatchSearchPhase return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order())); } + @Override + protected void performPhaseOnShard(int shardIndex, SearchShardIterator shardIt, SearchShardTarget shard) { + CoordinatorRewriteContext coordinatorRewriteContext = + coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardIt.shardId().getIndex()); + + if (coordinatorRewriteContext == null) { + super.performPhaseOnShard(shardIndex, shardIt, shard); + return; + } + + try { + ShardSearchRequest request = buildShardSearchRequest(shardIt, shardIndex); + boolean canMatch = SearchService.queryStillMatchesAfterRewrite(request, coordinatorRewriteContext); + + // Trigger the query as there's still a chance that we can skip + // this shard given other query filters that we cannot apply + // in the coordinator + if (canMatch) { + super.performPhaseOnShard(shardIndex, shardIt, shard); + return; + } + + CanMatchResponse result = new CanMatchResponse(canMatch, null); + result.setSearchShardTarget(shard == null ? new SearchShardTarget(null, shardIt.shardId(), shardIt.getClusterAlias(), + shardIt.getOriginalIndices()) : shard); + result.setShardIndex(shardIndex); + fork(() -> onShardResult(result, shardIt)); + } catch (Exception e) { + // If we fail to rewrite it on the coordinator, just try to execute + // the query in the shard. + super.performPhaseOnShard(shardIndex, shardIt, shard); + } + } + private static List sortShards(GroupShardsIterator shardsIts, MinAndMax[] minAndMaxes, SortOrder order) { diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 9eaedf07865d8..4d969d7f892fa 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -40,8 +40,8 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; @@ -788,7 +788,7 @@ public void run() { action.start(); } }; - }, clusters); + }, clusters, searchService.getCoordinatorRewriteContextProvider(timeProvider::getAbsoluteStartMillis)); } else { final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults(executor, circuitBreaker, task.getProgressListener(), searchRequest, shardIterators.size(), diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java index e3193e56befc0..a1c5aea8fbf74 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java @@ -487,6 +487,20 @@ public Query distanceFeatureQuery(Object origin, String pivot, QueryShardContext public Relation isFieldWithinQuery(IndexReader reader, Object from, Object to, boolean includeLower, boolean includeUpper, ZoneId timeZone, DateMathParser dateParser, QueryRewriteContext context) throws IOException { + if (PointValues.size(reader, name()) == 0) { + // no points, so nothing matches + return Relation.DISJOINT; + } + + long minValue = LongPoint.decodeDimension(PointValues.getMinPackedValue(reader, name()), 0); + long maxValue = LongPoint.decodeDimension(PointValues.getMaxPackedValue(reader, name()), 0); + + return isFieldWithinQuery(minValue, maxValue, from, to, includeLower, includeUpper, timeZone, dateParser, context); + } + + public Relation isFieldWithinQuery(long minValue, long maxValue, + Object from, Object to, boolean includeLower, boolean includeUpper, + ZoneId timeZone, DateMathParser dateParser, QueryRewriteContext context) throws IOException { if (dateParser == null) { dateParser = this.dateMathParser; } @@ -513,14 +527,6 @@ public Relation isFieldWithinQuery(IndexReader reader, } } - if (PointValues.size(reader, name()) == 0) { - // no points, so nothing matches - return Relation.DISJOINT; - } - - long minValue = LongPoint.decodeDimension(PointValues.getMinPackedValue(reader, name()), 0); - long maxValue = LongPoint.decodeDimension(PointValues.getMaxPackedValue(reader, name()), 0); - if (minValue >= fromInclusive && maxValue <= toInclusive) { return Relation.WITHIN; } else if (maxValue < fromInclusive || minValue > toInclusive) { diff --git a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java new file mode 100644 index 0000000000000..a4840a75e19df --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.query; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.shard.IndexLongFieldRange; + +import java.util.function.LongSupplier; + +/** + * Context object used to rewrite {@link QueryBuilder} instances into simplified version in the coordinator. + * Instances of this object rely on information stored in the {@code IndexMetadata} for certain indices. + * Right now this context object is able to rewrite range queries that include a known timestamp field + * (i.e. the timestamp field for DataStreams) into a MatchNoneQueryBuilder and skip the shards that + * don't hold queried data. See IndexMetadata#getTimestampMillisRange() for more details + */ +public class CoordinatorRewriteContext extends QueryRewriteContext { + private final Index index; + private IndexLongFieldRange indexLongFieldRange; + private final DateFieldMapper.DateFieldType timestampFieldType; + + public CoordinatorRewriteContext(NamedXContentRegistry xContentRegistry, + NamedWriteableRegistry writeableRegistry, + Client client, + LongSupplier nowInMillis, + Index index, + IndexLongFieldRange indexLongFieldRange, + DateFieldMapper.DateFieldType timestampFieldType) { + super(xContentRegistry, writeableRegistry, client, nowInMillis); + this.index = index; + this.indexLongFieldRange = indexLongFieldRange; + this.timestampFieldType = timestampFieldType; + } + + long getMinTimestamp() { + return indexLongFieldRange.getMin(); + } + + long getMaxTimestamp() { + return indexLongFieldRange.getMax(); + } + + boolean hasTimestampData() { + return indexLongFieldRange.isComplete() && indexLongFieldRange != IndexLongFieldRange.EMPTY; + } + + @Nullable + public MappedFieldType getFieldType(String fieldName) { + if (fieldName.equals(timestampFieldType.name()) == false) { + return null; + } + + return timestampFieldType; + } + + @Override + public CoordinatorRewriteContext convertToCoordinatorRewriteContext() { + return this; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java new file mode 100644 index 0000000000000..43170493d27ac --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.query; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.shard.IndexLongFieldRange; + +import java.util.function.Function; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +public class CoordinatorRewriteContextProvider { + private final NamedXContentRegistry xContentRegistry; + private final NamedWriteableRegistry writeableRegistry; + private final Client client; + private final LongSupplier nowInMillis; + private final Supplier clusterStateSupplier; + private final Function mappingSupplier; + + public CoordinatorRewriteContextProvider(NamedXContentRegistry xContentRegistry, + NamedWriteableRegistry writeableRegistry, + Client client, + LongSupplier nowInMillis, + Supplier clusterStateSupplier, + Function mappingSupplier) { + this.xContentRegistry = xContentRegistry; + this.writeableRegistry = writeableRegistry; + this.client = client; + this.nowInMillis = nowInMillis; + this.clusterStateSupplier = clusterStateSupplier; + this.mappingSupplier = mappingSupplier; + } + + @Nullable + public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) { + ClusterState clusterState = clusterStateSupplier.get(); + IndexMetadata indexMetadata = clusterState.metadata().index(index); + + if (indexMetadata == null || indexMetadata.getTimestampMillisRange().containsAllShardRanges() == false) { + return null; + } + + DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index); + + if (dateFieldType == null) { + return null; + } + + IndexLongFieldRange timestampMillisRange = indexMetadata.getTimestampMillisRange(); + return new CoordinatorRewriteContext(xContentRegistry, + writeableRegistry, + client, + nowInMillis, + index, + timestampMillisRange, + dateFieldType + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java b/server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java index baf60bbbc0912..97aae8c255cfa 100644 --- a/server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java +++ b/server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java @@ -75,6 +75,10 @@ public QueryShardContext convertToShardContext() { return null; } + public CoordinatorRewriteContext convertToCoordinatorRewriteContext() { + return null; + } + /** * Registers an async action that must be executed before the next rewrite round in order to make progress. * This should be used if a rewriteabel needs to fetch some external resources in order to be executed ie. a document diff --git a/server/src/main/java/org/elasticsearch/index/query/RangeQueryBuilder.java b/server/src/main/java/org/elasticsearch/index/query/RangeQueryBuilder.java index 2c3cc5797fca3..ba1478b0fc84f 100644 --- a/server/src/main/java/org/elasticsearch/index/query/RangeQueryBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/RangeQueryBuilder.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.time.DateMathParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.FieldNamesFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; @@ -428,6 +429,22 @@ public String getWriteableName() { // Overridable for testing only protected MappedFieldType.Relation getRelation(QueryRewriteContext queryRewriteContext) throws IOException { + CoordinatorRewriteContext coordinatorRewriteContext = queryRewriteContext.convertToCoordinatorRewriteContext(); + if (coordinatorRewriteContext != null) { + final MappedFieldType fieldType = coordinatorRewriteContext.getFieldType(fieldName); + if (fieldType instanceof DateFieldMapper.DateFieldType) { + final DateFieldMapper.DateFieldType dateFieldType = (DateFieldMapper.DateFieldType) fieldType; + if (coordinatorRewriteContext.hasTimestampData() == false) { + return MappedFieldType.Relation.DISJOINT; + } + long minTimestamp = coordinatorRewriteContext.getMinTimestamp(); + long maxTimestamp = coordinatorRewriteContext.getMaxTimestamp(); + DateMathParser dateMathParser = getForceDateParser(); + return dateFieldType.isFieldWithinQuery(minTimestamp, maxTimestamp, from, to, includeLower, + includeUpper, timeZone, dateMathParser, queryRewriteContext); + } + } + QueryShardContext shardContext = queryRewriteContext.convertToShardContext(); if (shardContext != null) { final MappedFieldType fieldType = shardContext.getFieldType(fieldName); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java b/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java index 3f6a6b0be058f..5503618cfe5bb 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java @@ -78,6 +78,13 @@ public boolean isComplete() { return shards == null; } + /** + * @return whether this range includes information from all shards and can be used meaningfully. + */ + public boolean containsAllShardRanges() { + return isComplete() && this != IndexLongFieldRange.UNKNOWN; + } + // exposed for testing int[] getShards() { return shards; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4468acfd180a5..7a9a26713b667 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1820,14 +1820,7 @@ public ShardLongFieldRange getTimestampMillisRange() { return ShardLongFieldRange.EMPTY; } - try { - return ShardLongFieldRange.of( - dateFieldType.resolution().roundDownToMillis(rawTimestampFieldRange.getMin()), - dateFieldType.resolution().roundUpToMillis(rawTimestampFieldRange.getMax())); - } catch (IllegalArgumentException e) { - logger.debug(new ParameterizedMessage("could not convert {} to a millisecond time range", rawTimestampFieldRange), e); - return ShardLongFieldRange.UNKNOWN; // any search might match this shard - } + return ShardLongFieldRange.of(rawTimestampFieldRange.getMin(), rawTimestampFieldRange.getMax()); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index de0a8e7250717..d96a31bf570ba 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -102,6 +102,7 @@ import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.recovery.RecoveryStats; @@ -1535,6 +1536,15 @@ public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) { return new QueryRewriteContext(xContentRegistry, namedWriteableRegistry, client, nowInMillis); } + public CoordinatorRewriteContextProvider getCoordinatorRewriteContextProvider(LongSupplier nowInMillis) { + return new CoordinatorRewriteContextProvider(xContentRegistry, + namedWriteableRegistry, + client, + nowInMillis, + clusterService::state, + this::getTimestampFieldType); + } + /** * Clears the caches for the given shard id if the shard is still allocated on this node */ diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 118883ba93125..179446a4a70b4 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -59,6 +59,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; import org.elasticsearch.index.query.InnerHitContextBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchNoneQueryBuilder; @@ -1187,24 +1188,34 @@ private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefre try (Releasable ignored2 = canMatchSearcher) { QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), canMatchSearcher, request::nowInMillis, request.getClusterAlias(), request.getRuntimeMappings()); - Rewriteable.rewrite(request.getRewriteable(), context, false); - final boolean aliasFilterCanMatch = request.getAliasFilter() - .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; - FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); - MinAndMax minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; - final boolean canMatch; - if (canRewriteToMatchNone(request.source())) { - QueryBuilder queryBuilder = request.source().query(); - canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false; + final boolean canMatch = queryStillMatchesAfterRewrite(request, context); + final MinAndMax minMax; + if (canMatch || hasRefreshPending) { + FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); + minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; } else { - // null query means match_all - canMatch = aliasFilterCanMatch; + minMax = null; } return new CanMatchResponse(canMatch || hasRefreshPending, minMax); } } } + public static boolean queryStillMatchesAfterRewrite(ShardSearchRequest request, QueryRewriteContext context) throws IOException { + Rewriteable.rewrite(request.getRewriteable(), context, false); + final boolean aliasFilterCanMatch = request.getAliasFilter() + .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; + final boolean canMatch; + if (canRewriteToMatchNone(request.source())) { + QueryBuilder queryBuilder = request.source().query(); + canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false; + } else { + // null query means match_all + canMatch = aliasFilterCanMatch; + } + return canMatch; + } + /** * Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words * if the execution of the search request can be early terminated without executing it. This is for instance not possible if @@ -1240,6 +1251,10 @@ public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) { return indicesService.getRewriteContext(nowInMillis); } + public CoordinatorRewriteContextProvider getCoordinatorRewriteContextProvider(LongSupplier nowInMillis) { + return indicesService.getCoordinatorRewriteContextProvider(nowInMillis); + } + public IndicesService getIndicesService() { return indicesService; } diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 13796ed770b74..6b5d4bc8b8794 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -22,12 +22,29 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.query.AbstractQueryBuilder; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -44,6 +61,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -53,12 +71,18 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.elasticsearch.action.search.SearchAsyncActionTests.getShardsIter; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { + private final CoordinatorRewriteContextProvider EMPTY_CONTEXT_PROVIDER = new StaticCoordinatorRewriteContextProviderBuilder().build(); + public void testFilterShards() throws InterruptedException { final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), @@ -83,7 +107,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req AtomicReference> result = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - GroupShardsIterator shardsIter = SearchAsyncActionTests.getShardsIter("idx", + GroupShardsIterator shardsIter = getShardsIter("idx", new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS), 2, randomBoolean(), primaryNode, replicaNode); final SearchRequest searchRequest = new SearchRequest(); @@ -100,7 +124,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req public void run() throws IOException { result.set(iter); latch.countDown(); - }}, SearchResponse.Clusters.EMPTY); + }}, SearchResponse.Clusters.EMPTY, EMPTY_CONTEXT_PROVIDER); canMatchPhase.start(); latch.await(); @@ -150,7 +174,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req AtomicReference> result = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - GroupShardsIterator shardsIter = SearchAsyncActionTests.getShardsIter("idx", + GroupShardsIterator shardsIter = getShardsIter("idx", new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS), 2, randomBoolean(), primaryNode, replicaNode); @@ -168,7 +192,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req public void run() throws IOException { result.set(iter); latch.countDown(); - }}, SearchResponse.Clusters.EMPTY); + }}, SearchResponse.Clusters.EMPTY, EMPTY_CONTEXT_PROVIDER); canMatchPhase.start(); latch.await(); @@ -209,7 +233,7 @@ public void sendCanMatch( final CountDownLatch latch = new CountDownLatch(1); final OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS); final GroupShardsIterator shardsIter = - SearchAsyncActionTests.getShardsIter("idx", originalIndices, 4096, randomBoolean(), primaryNode, replicaNode); + getShardsIter("idx", originalIndices, 4096, randomBoolean(), primaryNode, replicaNode); final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors())); final SearchRequest searchRequest = new SearchRequest(); searchRequest.allowPartialSearchResults(true); @@ -272,7 +296,7 @@ protected void executePhaseOnShard( listener.onFailure(new Exception("failure")); } } - }, SearchResponse.Clusters.EMPTY); + }, SearchResponse.Clusters.EMPTY, EMPTY_CONTEXT_PROVIDER); canMatchPhase.start(); latch.await(); @@ -315,7 +339,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req AtomicReference> result = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - GroupShardsIterator shardsIter = SearchAsyncActionTests.getShardsIter("logs", + GroupShardsIterator shardsIter = getShardsIter("logs", new OriginalIndices(new String[]{"logs"}, SearchRequest.DEFAULT_INDICES_OPTIONS), randomIntBetween(2, 20), randomBoolean(), primaryNode, replicaNode); final SearchRequest searchRequest = new SearchRequest(); @@ -334,7 +358,7 @@ public void run() { result.set(iter); latch.countDown(); } - }, SearchResponse.Clusters.EMPTY); + }, SearchResponse.Clusters.EMPTY, EMPTY_CONTEXT_PROVIDER); canMatchPhase.start(); latch.await(); @@ -392,7 +416,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req AtomicReference> result = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - GroupShardsIterator shardsIter = SearchAsyncActionTests.getShardsIter("logs", + GroupShardsIterator shardsIter = getShardsIter("logs", new OriginalIndices(new String[]{"logs"}, SearchRequest.DEFAULT_INDICES_OPTIONS), numShards, randomBoolean(), primaryNode, replicaNode); final SearchRequest searchRequest = new SearchRequest(); @@ -411,7 +435,7 @@ public void run() { result.set(iter); latch.countDown(); } - }, SearchResponse.Clusters.EMPTY); + }, SearchResponse.Clusters.EMPTY, EMPTY_CONTEXT_PROVIDER); canMatchPhase.start(); latch.await(); @@ -423,4 +447,360 @@ public void run() { assertThat(result.get().size(), equalTo(numShards)); } } + + public void testCanMatchFilteringOnCoordinatorThatCanBeSkipped() throws Exception { + Index dataStreamIndex1 = new Index(".ds-mydata0001", UUIDs.base64UUID()); + Index dataStreamIndex2 = new Index(".ds-mydata0002", UUIDs.base64UUID()); + final List dataStreamIndices = org.elasticsearch.common.collect.List.of(dataStreamIndex1, dataStreamIndex2); + DataStream dataStream = + new DataStream("mydata", new DataStream.TimestampField("@timestamp"), dataStreamIndices); + + List regularIndices = + randomList(0, 2, () -> new Index(randomAlphaOfLength(10), UUIDs.base64UUID())); + + long indexMinTimestamp = randomLongBetween(0, 5000); + long indexMaxTimestamp = randomLongBetween(indexMinTimestamp, 5000 * 2); + StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder(); + String timestampFieldName = dataStream.getTimeStampField().getName(); + for (Index dataStreamIndex : dataStream.getIndices()) { + contextProviderBuilder.addIndexMinMaxTimestamps(dataStreamIndex, timestampFieldName, indexMinTimestamp, indexMaxTimestamp); + } + + RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder(timestampFieldName); + // We query a range outside of the timestamp range covered by both datastream indices + rangeQueryBuilder + .from(indexMaxTimestamp + 1) + .to(indexMaxTimestamp + 2); + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder() + .filter(rangeQueryBuilder); + + if (randomBoolean()) { + // Add an additional filter that cannot be evaluated in the coordinator but shouldn't + // affect the end result as we're filtering + queryBuilder.filter(new TermQueryBuilder("fake", "value")); + } + + assignShardsAndExecuteCanMatchPhase(dataStream, regularIndices, contextProviderBuilder.build(), queryBuilder, + (updatedSearchShardIterators, requests) -> { + List skippedShards = updatedSearchShardIterators.stream() + .filter(SearchShardIterator::skip) + .collect(Collectors.toList());; + + List nonSkippedShards = updatedSearchShardIterators.stream() + .filter(searchShardIterator -> searchShardIterator.skip() == false) + .collect(Collectors.toList());; + + int regularIndexShardCount = (int) updatedSearchShardIterators.stream() + .filter(s -> regularIndices.contains(s.shardId().getIndex())) + .count(); + + // When all the shards can be skipped we should query at least 1 + // in order to get a valid search response. + if (regularIndexShardCount == 0) { + assertThat(nonSkippedShards.size(), equalTo(1)); + } else { + boolean allNonSkippedShardsAreFromRegularIndices = nonSkippedShards.stream() + .allMatch(shardIterator -> regularIndices.contains(shardIterator.shardId().getIndex())); + + assertThat(allNonSkippedShardsAreFromRegularIndices, equalTo(true)); + } + + boolean allSkippedShardAreFromDataStream = skippedShards.stream() + .allMatch(shardIterator -> dataStream.getIndices().contains(shardIterator.shardId().getIndex())); + assertThat(allSkippedShardAreFromDataStream, equalTo(true)); + + boolean allRequestsWereTriggeredAgainstRegularIndices = requests.stream() + .allMatch(request -> regularIndices.contains(request.shardId().getIndex())); + assertThat(allRequestsWereTriggeredAgainstRegularIndices, equalTo(true)); + }); + } + + public void testCanMatchFilteringOnCoordinatorParsingFails() throws Exception { + Index dataStreamIndex1 = new Index(".ds-mydata0001", UUIDs.base64UUID()); + Index dataStreamIndex2 = new Index(".ds-mydata0002", UUIDs.base64UUID()); + final List dataStreamIndices = org.elasticsearch.common.collect.List.of(dataStreamIndex1, dataStreamIndex2); + DataStream dataStream = + new DataStream("mydata", new DataStream.TimestampField("@timestamp"), dataStreamIndices); + + List regularIndices = + randomList(0, 2, () -> new Index(randomAlphaOfLength(10), UUIDs.base64UUID())); + + long indexMinTimestamp = randomLongBetween(0, 5000); + long indexMaxTimestamp = randomLongBetween(indexMinTimestamp, 5000 * 2); + StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder(); + String timestampFieldName = dataStream.getTimeStampField().getName(); + for (Index dataStreamIndex : dataStream.getIndices()) { + contextProviderBuilder.addIndexMinMaxTimestamps(dataStreamIndex, timestampFieldName, indexMinTimestamp, indexMaxTimestamp); + } + + RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder(timestampFieldName); + // Query with a non default date format + rangeQueryBuilder + .from("2020-1-01") + .to("2021-1-01"); + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder() + .filter(rangeQueryBuilder); + + if (randomBoolean()) { + // Add an additional filter that cannot be evaluated in the coordinator but shouldn't + // affect the end result as we're filtering + queryBuilder.filter(new TermQueryBuilder("fake", "value")); + } + + assignShardsAndExecuteCanMatchPhase(dataStream, + regularIndices, + contextProviderBuilder.build(), + queryBuilder, + this::assertAllShardsAreQueried + ); + } + + public void testCanMatchFilteringOnCoordinatorThatCanNotBeSkipped() throws Exception { + // Generate indices + Index dataStreamIndex1 = new Index(".ds-mydata0001", UUIDs.base64UUID()); + Index dataStreamIndex2 = new Index(".ds-mydata0002", UUIDs.base64UUID()); + final List dataStreamIndices = org.elasticsearch.common.collect.List.of(dataStreamIndex1, dataStreamIndex2); + DataStream dataStream = + new DataStream("mydata", new DataStream.TimestampField("@timestamp"), dataStreamIndices); + + List regularIndices = + randomList(0, 2, () -> new Index(randomAlphaOfLength(10), UUIDs.base64UUID())); + + long indexMinTimestamp = 10; + long indexMaxTimestamp = 20; + StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder(); + String timestampFieldName = dataStream.getTimeStampField().getName(); + for (Index dataStreamIndex : dataStream.getIndices()) { + contextProviderBuilder.addIndexMinMaxTimestamps(dataStreamIndex, timestampFieldName, indexMinTimestamp, indexMaxTimestamp); + } + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); + // Query inside of the data stream index range + if (randomBoolean()) { + // Query generation + RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder(timestampFieldName); + // We query a range within the timestamp range covered by both datastream indices + rangeQueryBuilder + .from(indexMinTimestamp) + .to(indexMaxTimestamp); + + queryBuilder.filter(rangeQueryBuilder); + + if (randomBoolean()) { + // Add an additional filter that cannot be evaluated in the coordinator but shouldn't + // affect the end result as we're filtering + queryBuilder.filter(new TermQueryBuilder("fake", "value")); + } + } else { + // We query a range outside of the timestamp range covered by both datastream indices + RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder(timestampFieldName) + .from(indexMaxTimestamp + 1) + .to(indexMaxTimestamp + 2); + + TermQueryBuilder termQueryBuilder = new TermQueryBuilder("fake", "value"); + + // This is always evaluated as true in the coordinator as we cannot determine there if + // the term query clause is false. + queryBuilder.should(rangeQueryBuilder) + .should(termQueryBuilder); + } + + assignShardsAndExecuteCanMatchPhase(dataStream, + regularIndices, + contextProviderBuilder.build(), + queryBuilder, + this::assertAllShardsAreQueried + ); + } + + private void assertAllShardsAreQueried(List updatedSearchShardIterators, List requests) { + int skippedShards = (int) updatedSearchShardIterators.stream() + .filter(SearchShardIterator::skip) + .count(); + + assertThat(skippedShards, equalTo(0)); + + int nonSkippedShards = (int) updatedSearchShardIterators.stream() + .filter(searchShardIterator -> searchShardIterator.skip() == false) + .count(); + + assertThat(nonSkippedShards, equalTo(updatedSearchShardIterators.size())); + + int shardsWithPrimariesAssigned = (int) updatedSearchShardIterators.stream() + .filter(s -> s.size() > 0) + .count(); + assertThat(requests.size(), equalTo(shardsWithPrimariesAssigned)); + } + + private > + void assignShardsAndExecuteCanMatchPhase(DataStream dataStream, + List regularIndices, + CoordinatorRewriteContextProvider contextProvider, + AbstractQueryBuilder query, + BiConsumer, + List> canMatchResultsConsumer) throws Exception { + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); + + List indicesToSearch = new ArrayList<>(); + indicesToSearch.add(dataStream.getName()); + for (Index regularIndex : regularIndices) { + indicesToSearch.add(regularIndex.getName()); + } + + String[] indices = indicesToSearch.toArray(new String[0]); + OriginalIndices originalIndices = new OriginalIndices(indices, SearchRequest.DEFAULT_INDICES_OPTIONS); + + boolean atLeastOnePrimaryAssigned = false; + final List originalShardIters = new ArrayList<>(); + for (Index dataStreamIndex : dataStream.getIndices()) { + // If we have to execute the can match request against all the shards + // and none is assigned, the phase is considered as failed meaning that the next phase won't be executed + boolean withAssignedPrimaries = randomBoolean() || atLeastOnePrimaryAssigned == false; + int numShards = randomIntBetween(1, 6); + originalShardIters.addAll( + getShardsIter(dataStreamIndex, + originalIndices, + numShards, + false, + withAssignedPrimaries ? primaryNode : null, + null) + ); + atLeastOnePrimaryAssigned |= withAssignedPrimaries; + } + + for (Index regularIndex : regularIndices) { + originalShardIters.addAll( + getShardsIter(regularIndex, + originalIndices, + randomIntBetween(1, 6), + randomBoolean(), + primaryNode, + replicaNode) + ); + } + GroupShardsIterator shardsIter = GroupShardsIterator.sortAndCreate(originalShardIters); + + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(indices); + searchRequest.allowPartialSearchResults(true); + + final AliasFilter aliasFilter; + if (randomBoolean()) { + // Apply the query on the request body + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource(); + searchSourceBuilder.query(query); + searchRequest.source(searchSourceBuilder); + + // Sometimes apply the same query in the alias filter too + aliasFilter = new AliasFilter(randomBoolean() ? query : null, Strings.EMPTY_ARRAY); + } else { + // Apply the query as an alias filter + aliasFilter = new AliasFilter(query, Strings.EMPTY_ARRAY); + } + + Map aliasFilters = new HashMap<>(); + for (Index dataStreamIndex : dataStream.getIndices()) { + aliasFilters.put(dataStreamIndex.getUUID(), aliasFilter); + } + + for (Index regularIndex : regularIndices) { + aliasFilters.put(regularIndex.getUUID(), aliasFilter); + } + + // We respond by default that the query can match + final List requests = Collections.synchronizedList(new ArrayList<>()); + SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { + @Override + public void sendCanMatch(Transport.Connection connection, ShardSearchRequest request, SearchTask task, + ActionListener listener) { + requests.add(request); + listener.onResponse(new SearchService.CanMatchResponse(true, null)); + } + }; + + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + + AtomicReference> result = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger, + searchTransportService, + (clusterAlias, node) -> lookup.get(node), + aliasFilters, + Collections.emptyMap(), + EsExecutors.newDirectExecutorService(), + searchRequest, + null, + shardsIter, + timeProvider, + ClusterState.EMPTY_STATE, + null, + (iter) -> new SearchPhase("test") { + @Override + public void run() throws IOException { + result.set(iter); + latch.countDown(); + } + }, + SearchResponse.Clusters.EMPTY, + contextProvider); + + canMatchPhase.start(); + latch.await(); + + List updatedSearchShardIterators = new ArrayList<>(); + for (SearchShardIterator updatedSearchShardIterator : result.get()) { + updatedSearchShardIterators.add(updatedSearchShardIterator); + } + + canMatchResultsConsumer.accept(updatedSearchShardIterators, requests); + } + + private static class StaticCoordinatorRewriteContextProviderBuilder { + private ClusterState clusterState = ClusterState.EMPTY_STATE; + private final Map fields = new HashMap<>(); + + private void addIndexMinMaxTimestamps(Index index, String fieldName, long minTimeStamp, long maxTimestamp) { + if (clusterState.metadata().index(index) != null) { + throw new IllegalArgumentException("Min/Max timestamps for " + index + " were already defined"); + } + + IndexLongFieldRange timestampMillisRange = IndexLongFieldRange.NO_SHARDS + .extendWithShardRange(0, 1, ShardLongFieldRange.of(minTimeStamp, maxTimestamp)); + + Settings.Builder indexSettings = settings(Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()); + + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(index.getName()) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .timestampMillisRange(timestampMillisRange); + + Metadata.Builder metadataBuilder = + Metadata.builder(clusterState.metadata()) + .put(indexMetadataBuilder); + + clusterState = ClusterState.builder(clusterState) + .metadata(metadataBuilder) + .build(); + + fields.put(index, new DateFieldMapper.DateFieldType(fieldName)); + } + + public CoordinatorRewriteContextProvider build() { + return new CoordinatorRewriteContextProvider(NamedXContentRegistry.EMPTY, + mock(NamedWriteableRegistry.class), + mock(Client.class), + System::currentTimeMillis, + () -> clusterState, + fields::get); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 02ba742ef2e94..644d29bb70a84 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -48,12 +48,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -61,6 +63,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class SearchAsyncActionTests extends ESTestCase { @@ -575,26 +578,107 @@ protected void executeNext(Runnable runnable, Thread originalThread) { assertThat(numFailReplicas.get(), greaterThanOrEqualTo(1)); } + public void testSkipUnavailableSearchShards() throws InterruptedException { + SearchRequest request = new SearchRequest(); + request.allowPartialSearchResults(true); + ActionListener responseListener = ActionListener.wrap(response -> {}, + (e) -> { throw new AssertionError("unexpected", e);}); + DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + + final int numUnavailableSkippedShards = randomIntBetween(1, 10); + List searchShardIterators = new ArrayList<>(numUnavailableSkippedShards); + OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS); + for (int i = 0; i < numUnavailableSkippedShards; i++) { + Index index = new Index("idx", "_na_"); + SearchShardIterator searchShardIterator = + new SearchShardIterator(null, new ShardId(index, 0), Collections.emptyList(), originalIndices); + // Skip all the shards + searchShardIterator.resetAndSkip(); + searchShardIterators.add(searchShardIterator); + } + GroupShardsIterator shardsIter = new GroupShardsIterator<>(searchShardIterators); + Map lookup = Collections.singletonMap(primaryNode.getId(), new MockConnection(primaryNode)); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean searchPhaseDidRun = new AtomicBoolean(false); + AbstractSearchAsyncAction asyncAction = + new AbstractSearchAsyncAction( + "test", + logger, + new SearchTransportService(null, null, null), + (cluster, node) -> { + assert cluster == null : "cluster was not null: " + cluster; + return lookup.get(node); }, + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), + null, + request, + responseListener, + shardsIter, + new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), + ClusterState.EMPTY_STATE, + null, + new ArraySearchPhaseResults<>(shardsIter.size()), + request.getMaxConcurrentShardRequests(), + SearchResponse.Clusters.EMPTY) { + + @Override + protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, + SearchActionListener listener) { + assert false : "Expected to skip all shards"; + } + + @Override + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new SearchPhase("test") { + @Override + public void run() { + assertTrue(searchPhaseDidRun.compareAndSet(false, true)); + latch.countDown(); + } + }; + } + }; + asyncAction.start(); + assertThat(latch.await(4, TimeUnit.SECONDS), equalTo(true)); + assertThat(searchPhaseDidRun.get(), equalTo(true)); + + SearchResponse searchResponse = + asyncAction.buildSearchResponse(null, asyncAction.buildShardFailures(), null, null); + assertThat(searchResponse.getSkippedShards(), equalTo(numUnavailableSkippedShards)); + assertThat(searchResponse.getFailedShards(), equalTo(0)); + assertThat(searchResponse.getSuccessfulShards(), equalTo(shardsIter.size())); + } + static GroupShardsIterator getShardsIter(String index, OriginalIndices originalIndices, int numShards, boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) { + return new GroupShardsIterator<>( + getShardsIter(new Index(index, "_na_"), originalIndices, numShards, doReplicas, primaryNode, replicaNode) + ); + } + + static List getShardsIter(Index index, OriginalIndices originalIndices, int numShards, + boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) { ArrayList list = new ArrayList<>(); for (int i = 0; i < numShards; i++) { ArrayList started = new ArrayList<>(); ArrayList initializing = new ArrayList<>(); ArrayList unassigned = new ArrayList<>(); - ShardRouting routing = ShardRouting.newUnassigned(new ShardId(new Index(index, "_na_"), i), true, + ShardRouting routing = ShardRouting.newUnassigned(new ShardId(index, i), true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); - routing = routing.initialize(primaryNode.getId(), i + "p", 0); - routing.started(); - started.add(routing); - if (doReplicas) { - routing = ShardRouting.newUnassigned(new ShardId(new Index(index, "_na_"), i), false, + if (primaryNode != null) { + routing = routing.initialize(primaryNode.getId(), i + "p", 0); + routing = routing.moveToStarted(); + started.add(routing); + } + if (doReplicas && primaryNode != null) { + routing = ShardRouting.newUnassigned(new ShardId(index, i), false, RecoverySource.PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); if (replicaNode != null) { routing = routing.initialize(replicaNode.getId(), i + "r", 0); if (randomBoolean()) { - routing.started(); + routing = routing.moveToStarted(); started.add(routing); } else { initializing.add(routing); @@ -605,11 +689,12 @@ static GroupShardsIterator getShardsIter(String index, Orig } Collections.shuffle(started, random()); started.addAll(initializing); - list.add(new SearchShardIterator(null, new ShardId(new Index(index, "_na_"), i), started, originalIndices)); + list.add(new SearchShardIterator(null, new ShardId(index, i), started, originalIndices)); } - return new GroupShardsIterator<>(list); + return list; } + public static class TestSearchResponse extends SearchResponse { final Set queried = new HashSet<>(); diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 26f1f7669517a..55890977d2fa3 100644 --- a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.IndexLongFieldRange; @@ -52,9 +53,9 @@ import org.elasticsearch.xpack.core.XPackClient; import org.elasticsearch.xpack.frozen.FrozenIndices; import org.hamcrest.Matchers; -import org.joda.time.Instant; import java.io.IOException; +import java.time.Instant; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -511,8 +512,8 @@ public void testComputesTimestampRangeFromMilliseconds() { assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.EMPTY))); assertTrue(timestampFieldRange.isComplete()); - assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-05T01:02:03.456Z").getMillis())); - assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis())); + assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-05T01:02:03.456Z").toEpochMilli())); + assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").toEpochMilli())); for (ShardStats shardStats : client().admin().indices().prepareStats("index").clear().setRefresh(true).get().getShards()) { assertThat("shard " + shardStats.getShardRouting() + " refreshed to get the timestamp range", @@ -544,8 +545,11 @@ public void testComputesTimestampRangeFromNanoseconds() throws IOException { assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.EMPTY))); assertTrue(timestampFieldRange.isComplete()); - assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-05T01:02:03.456Z").getMillis())); - assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.568Z").getMillis())); + final DateFieldMapper.Resolution resolution = DateFieldMapper.Resolution.NANOSECONDS; + assertThat(timestampFieldRange.getMin(), + equalTo(resolution.convert(Instant.parse("2010-01-05T01:02:03.456789012Z")))); + assertThat(timestampFieldRange.getMax(), + equalTo(resolution.convert(Instant.parse("2010-01-06T02:03:04.567890123Z")))); for (ShardStats shardStats : client().admin().indices().prepareStats("index").clear().setRefresh(true).get().getShards()) { assertThat("shard " + shardStats.getShardRouting() + " refreshed to get the timestamp range", diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java new file mode 100644 index 0000000000000..7f61ad855cd6d --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java @@ -0,0 +1,507 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots; + +import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.List; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Locale; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING; +import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SearchableSnapshotsCanMatchOnCoordinatorIntegTests extends BaseSearchableSnapshotsIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(LocalStateSearchableSnapshots.class, MockTransportService.TestPlugin.class, MockRepository.Plugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + // Use an unbound cache so we can recover the searchable snapshot completely all the times + .put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)) + .build(); + } + + public void testSearchableSnapshotShardsAreSkippedWithoutQueryingAnyNodeWhenTheyAreOutsideOfTheQueryRange() throws Exception { + internalCluster().startMasterOnlyNode(); + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + final String dataNodeHoldingRegularIndex = internalCluster().startDataOnlyNode(); + final String dataNodeHoldingSearchableSnapshot = internalCluster().startDataOnlyNode(); + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNodeHoldingSearchableSnapshot); + + final String indexOutsideSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int indexOutsideSearchRangeShardCount = randomIntBetween(1, 3); + createIndexWithTimestamp(indexOutsideSearchRange, indexOutsideSearchRangeShardCount, Settings.EMPTY); + + final String indexWithinSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int indexWithinSearchRangeShardCount = randomIntBetween(1, 3); + createIndexWithTimestamp( + indexWithinSearchRange, + indexWithinSearchRangeShardCount, + Settings.builder() + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingRegularIndex) + .build() + ); + + final int totalShards = indexOutsideSearchRangeShardCount + indexWithinSearchRangeShardCount; + + // Either add data outside of the range, or documents that don't have timestamp data + final boolean indexDataWithTimestamp = randomBoolean(); + if (indexDataWithTimestamp) { + indexDocumentsWithTimestampWithinDate(indexOutsideSearchRange, between(0, 1000), "2020-11-26T%02d:%02d:%02d.%09dZ"); + } else { + indexRandomDocs(indexOutsideSearchRange, between(0, 1000)); + } + + int numDocsWithinRange = between(0, 1000); + indexDocumentsWithTimestampWithinDate(indexWithinSearchRange, numDocsWithinRange, "2020-11-28T%02d:%02d:%02d.%09dZ"); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "mock"); + + final SnapshotId snapshotId = createSnapshot(repositoryName, "snapshot-1", List.of(indexOutsideSearchRange)).snapshotId(); + assertAcked(client().admin().indices().prepareDelete(indexOutsideSearchRange)); + + final String searchableSnapshotIndexOutsideSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + // Block the repository for the node holding the searchable snapshot shards + // to delay its restore + blockDataNode(repositoryName, dataNodeHoldingSearchableSnapshot); + + // Force the searchable snapshot to be allocated in a particular node + Settings restoredIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()) + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingSearchableSnapshot) + .build(); + + final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( + searchableSnapshotIndexOutsideSearchRange, + repositoryName, + snapshotId.getName(), + indexOutsideSearchRange, + restoredIndexSettings, + Strings.EMPTY_ARRAY, + false + ); + client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).actionGet(); + + final IndexMetadata indexMetadata = getIndexMetadata(searchableSnapshotIndexOutsideSearchRange); + assertThat(indexMetadata.getTimestampMillisRange(), equalTo(IndexLongFieldRange.NO_SHARDS)); + + DateFieldMapper.DateFieldType timestampFieldType = indicesService.getTimestampFieldType(indexMetadata.getIndex()); + assertThat(timestampFieldType, nullValue()); + + final boolean includeIndexCoveringSearchRangeInSearchRequest = randomBoolean(); + java.util.List indicesToSearch = new ArrayList<>(); + if (includeIndexCoveringSearchRangeInSearchRequest) { + indicesToSearch.add(indexWithinSearchRange); + } + indicesToSearch.add(searchableSnapshotIndexOutsideSearchRange); + SearchRequest request = new SearchRequest().indices(indicesToSearch.toArray(new String[0])) + .source( + new SearchSourceBuilder().query( + QueryBuilders.rangeQuery(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .from("2020-11-28T00:00:00.000000000Z", true) + .to("2020-11-29T00:00:00.000000000Z") + ) + ); + + if (includeIndexCoveringSearchRangeInSearchRequest) { + SearchResponse searchResponse = client().search(request).actionGet(); + + // All the regular index searches succeeded + assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount)); + // All the searchable snapshots shard search failed + assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(searchResponse.getSkippedShards(), equalTo(0)); + assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); + } else { + // All shards failed, since all shards are unassigned and the IndexMetadata min/max timestamp + // is not available yet + expectThrows(SearchPhaseExecutionException.class, () -> client().search(request).actionGet()); + } + + // Allow the searchable snapshots to be finally mounted + unblockNode(repositoryName, dataNodeHoldingSearchableSnapshot); + waitUntilRecoveryIsDone(searchableSnapshotIndexOutsideSearchRange); + ensureGreen(searchableSnapshotIndexOutsideSearchRange); + + final IndexMetadata updatedIndexMetadata = getIndexMetadata(searchableSnapshotIndexOutsideSearchRange); + final IndexLongFieldRange updatedTimestampMillisRange = updatedIndexMetadata.getTimestampMillisRange(); + final DateFieldMapper.DateFieldType dateFieldType = indicesService.getTimestampFieldType(updatedIndexMetadata.getIndex()); + assertThat(dateFieldType, notNullValue()); + final DateFieldMapper.Resolution resolution = dateFieldType.resolution(); + assertThat(updatedTimestampMillisRange.isComplete(), equalTo(true)); + if (indexDataWithTimestamp) { + assertThat(updatedTimestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertThat( + updatedTimestampMillisRange.getMin(), + greaterThanOrEqualTo(resolution.convert(Instant.parse("2020-11-26T00:00:00Z"))) + ); + assertThat(updatedTimestampMillisRange.getMax(), lessThanOrEqualTo(resolution.convert(Instant.parse("2020-11-27T00:00:00Z")))); + } else { + assertThat(updatedTimestampMillisRange, sameInstance(IndexLongFieldRange.EMPTY)); + } + + // Stop the node holding the searchable snapshots, and since we defined + // the index allocation criteria to require the searchable snapshot + // index to be allocated in that node, the shards should remain unassigned + internalCluster().stopNode(dataNodeHoldingSearchableSnapshot); + waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex()); + + if (includeIndexCoveringSearchRangeInSearchRequest) { + SearchResponse newSearchResponse = client().search(request).actionGet(); + + assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getFailedShards(), equalTo(0)); + assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo((long) numDocsWithinRange)); + } else { + if (indexOutsideSearchRangeShardCount == 1) { + expectThrows(SearchPhaseExecutionException.class, () -> client().search(request).actionGet()); + } else { + SearchResponse newSearchResponse = client().search(request).actionGet(); + // When all shards are skipped, at least one of them should be queried in order to + // provide a proper search response. + assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); + assertThat(newSearchResponse.getFailedShards(), equalTo(1)); + assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount)); + } + } + } + + public void testQueryPhaseIsExecutedInAnAvailableNodeWhenAllShardsCanBeSkipped() throws Exception { + internalCluster().startMasterOnlyNode(); + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + final String dataNodeHoldingRegularIndex = internalCluster().startDataOnlyNode(); + final String dataNodeHoldingSearchableSnapshot = internalCluster().startDataOnlyNode(); + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNodeHoldingSearchableSnapshot); + + final String indexOutsideSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int indexOutsideSearchRangeShardCount = randomIntBetween(1, 3); + createIndexWithTimestamp( + indexOutsideSearchRange, + indexOutsideSearchRangeShardCount, + Settings.builder() + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingRegularIndex) + .build() + ); + + indexDocumentsWithTimestampWithinDate(indexOutsideSearchRange, between(0, 1000), "2020-11-26T%02d:%02d:%02d.%09dZ"); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "mock"); + + final SnapshotId snapshotId = createSnapshot(repositoryName, "snapshot-1", List.of(indexOutsideSearchRange)).snapshotId(); + + final String searchableSnapshotIndexOutsideSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + // Block the repository for the node holding the searchable snapshot shards + // to delay its restore + blockDataNode(repositoryName, dataNodeHoldingSearchableSnapshot); + + // Force the searchable snapshot to be allocated in a particular node + Settings restoredIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()) + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingSearchableSnapshot) + .build(); + + final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( + searchableSnapshotIndexOutsideSearchRange, + repositoryName, + snapshotId.getName(), + indexOutsideSearchRange, + restoredIndexSettings, + Strings.EMPTY_ARRAY, + false + ); + client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).actionGet(); + final int searchableSnapshotShardCount = indexOutsideSearchRangeShardCount; + + final IndexMetadata indexMetadata = getIndexMetadata(searchableSnapshotIndexOutsideSearchRange); + assertThat(indexMetadata.getTimestampMillisRange(), equalTo(IndexLongFieldRange.NO_SHARDS)); + + DateFieldMapper.DateFieldType timestampFieldType = indicesService.getTimestampFieldType(indexMetadata.getIndex()); + assertThat(timestampFieldType, nullValue()); + + SearchRequest request = new SearchRequest().indices(indexOutsideSearchRange, searchableSnapshotIndexOutsideSearchRange) + .source( + new SearchSourceBuilder().query( + QueryBuilders.rangeQuery(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .from("2020-11-28T00:00:00.000000000Z", true) + .to("2020-11-29T00:00:00.000000000Z") + ) + ); + + final int totalShards = indexOutsideSearchRangeShardCount + searchableSnapshotShardCount; + SearchResponse searchResponse = client().search(request).actionGet(); + + // All the regular index searches succeeded + assertThat(searchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount)); + // All the searchable snapshots shard search failed + assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(searchResponse.getSkippedShards(), equalTo(searchableSnapshotShardCount)); + assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L)); + + // Allow the searchable snapshots to be finally mounted + unblockNode(repositoryName, dataNodeHoldingSearchableSnapshot); + waitUntilRecoveryIsDone(searchableSnapshotIndexOutsideSearchRange); + ensureGreen(searchableSnapshotIndexOutsideSearchRange); + + final IndexMetadata updatedIndexMetadata = getIndexMetadata(searchableSnapshotIndexOutsideSearchRange); + final IndexLongFieldRange updatedTimestampMillisRange = updatedIndexMetadata.getTimestampMillisRange(); + final DateFieldMapper.DateFieldType dateFieldType = indicesService.getTimestampFieldType(updatedIndexMetadata.getIndex()); + assertThat(dateFieldType, notNullValue()); + final DateFieldMapper.Resolution resolution = dateFieldType.resolution(); + assertThat(updatedTimestampMillisRange.isComplete(), equalTo(true)); + assertThat(updatedTimestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertThat(updatedTimestampMillisRange.getMin(), greaterThanOrEqualTo(resolution.convert(Instant.parse("2020-11-26T00:00:00Z")))); + assertThat(updatedTimestampMillisRange.getMax(), lessThanOrEqualTo(resolution.convert(Instant.parse("2020-11-27T00:00:00Z")))); + + // Stop the node holding the searchable snapshots, and since we defined + // the index allocation criteria to require the searchable snapshot + // index to be allocated in that node, the shards should remain unassigned + internalCluster().stopNode(dataNodeHoldingSearchableSnapshot); + waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex()); + + SearchResponse newSearchResponse = client().search(request).actionGet(); + + // All the regular index searches succeeded + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getFailedShards(), equalTo(0)); + // We have to query at least one node to construct a valid response, and we pick + // a shard that's available in order to construct the search response + assertThat(newSearchResponse.getSkippedShards(), equalTo(totalShards - 1)); + assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo(0L)); + } + + public void testSearchableSnapshotShardsThatHaveMatchingDataAreNotSkippedOnTheCoordinatingNode() throws Exception { + internalCluster().startMasterOnlyNode(); + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + final String dataNodeHoldingRegularIndex = internalCluster().startDataOnlyNode(); + final String dataNodeHoldingSearchableSnapshot = internalCluster().startDataOnlyNode(); + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNodeHoldingSearchableSnapshot); + + final String indexWithinSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int indexWithinSearchRangeShardCount = randomIntBetween(1, 3); + createIndexWithTimestamp( + indexWithinSearchRange, + indexWithinSearchRangeShardCount, + Settings.builder() + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingRegularIndex) + .build() + ); + + indexDocumentsWithTimestampWithinDate(indexWithinSearchRange, between(1, 1000), "2020-11-28T%02d:%02d:%02d.%09dZ"); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "mock"); + + final SnapshotId snapshotId = createSnapshot(repositoryName, "snapshot-1", List.of(indexWithinSearchRange)).snapshotId(); + assertAcked(client().admin().indices().prepareDelete(indexWithinSearchRange)); + + final String searchableSnapshotIndexWithinSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + // Block the repository for the node holding the searchable snapshot shards + // to delay its restore + blockDataNode(repositoryName, dataNodeHoldingSearchableSnapshot); + + // Force the searchable snapshot to be allocated in a particular node + Settings restoredIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()) + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingSearchableSnapshot) + .build(); + + final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( + searchableSnapshotIndexWithinSearchRange, + repositoryName, + snapshotId.getName(), + indexWithinSearchRange, + restoredIndexSettings, + Strings.EMPTY_ARRAY, + false + ); + client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).actionGet(); + + final IndexMetadata indexMetadata = getIndexMetadata(searchableSnapshotIndexWithinSearchRange); + assertThat(indexMetadata.getTimestampMillisRange(), equalTo(IndexLongFieldRange.NO_SHARDS)); + + DateFieldMapper.DateFieldType timestampFieldType = indicesService.getTimestampFieldType(indexMetadata.getIndex()); + assertThat(timestampFieldType, nullValue()); + + SearchRequest request = new SearchRequest().indices(searchableSnapshotIndexWithinSearchRange) + .source( + new SearchSourceBuilder().query( + QueryBuilders.rangeQuery(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .from("2020-11-28T00:00:00.000000000Z", true) + .to("2020-11-29T00:00:00.000000000Z") + ) + ); + + // All shards failed, since all shards are unassigned and the IndexMetadata min/max timestamp + // is not available yet + expectThrows(SearchPhaseExecutionException.class, () -> client().search(request).actionGet()); + + // Allow the searchable snapshots to be finally mounted + unblockNode(repositoryName, dataNodeHoldingSearchableSnapshot); + waitUntilRecoveryIsDone(searchableSnapshotIndexWithinSearchRange); + ensureGreen(searchableSnapshotIndexWithinSearchRange); + + final IndexMetadata updatedIndexMetadata = getIndexMetadata(searchableSnapshotIndexWithinSearchRange); + final IndexLongFieldRange updatedTimestampMillisRange = updatedIndexMetadata.getTimestampMillisRange(); + final DateFieldMapper.DateFieldType dateFieldType = indicesService.getTimestampFieldType(updatedIndexMetadata.getIndex()); + assertThat(dateFieldType, notNullValue()); + final DateFieldMapper.Resolution resolution = dateFieldType.resolution(); + assertThat(updatedTimestampMillisRange.isComplete(), equalTo(true)); + assertThat(updatedTimestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertThat(updatedTimestampMillisRange.getMin(), greaterThanOrEqualTo(resolution.convert(Instant.parse("2020-11-28T00:00:00Z")))); + assertThat(updatedTimestampMillisRange.getMax(), lessThanOrEqualTo(resolution.convert(Instant.parse("2020-11-29T00:00:00Z")))); + + // Stop the node holding the searchable snapshots, and since we defined + // the index allocation criteria to require the searchable snapshot + // index to be allocated in that node, the shards should remain unassigned + internalCluster().stopNode(dataNodeHoldingSearchableSnapshot); + waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex()); + + // The range query matches but the shards that are unavailable, in that case the search fails, as all shards that hold + // data are unavailable + expectThrows(SearchPhaseExecutionException.class, () -> client().search(request).actionGet()); + } + + private void createIndexWithTimestamp(String indexName, int numShards, Settings extraSettings) throws IOException { + assertAcked( + client().admin() + .indices() + .prepareCreate(indexName) + .addMapping( + "_doc", + XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .field("type", randomFrom("date", "date_nanos")) + .field("format", "strict_date_optional_time_nanos") + .endObject() + .endObject() + .endObject() + ) + .setSettings(indexSettingsNoReplicas(numShards).put(INDEX_SOFT_DELETES_SETTING.getKey(), true).put(extraSettings)) + ); + ensureGreen(indexName); + } + + private void indexDocumentsWithTimestampWithinDate(String indexName, int docCount, String timestampTemplate) throws Exception { + final java.util.List indexRequestBuilders = new ArrayList<>(); + for (int i = 0; i < docCount; i++) { + indexRequestBuilders.add( + client().prepareIndex(indexName, "_doc") + .setSource( + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, + String.format( + Locale.ROOT, + timestampTemplate, + between(0, 23), + between(0, 59), + between(0, 59), + randomLongBetween(0, 999999999L) + ) + ) + ); + } + indexRandom(true, false, indexRequestBuilders); + + assertThat( + client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(), + equalTo(0) + ); + refresh(indexName); + forceMerge(); + } + + private IndexMetadata getIndexMetadata(String indexName) { + return client().admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setIndices(indexName) + .get() + .getState() + .metadata() + .index(indexName); + } + + private void waitUntilRecoveryIsDone(String index) throws Exception { + assertBusy(() -> { + RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(index).get(); + assertThat(recoveryResponse.hasRecoveries(), equalTo(true)); + for (java.util.List value : recoveryResponse.shardRecoveryStates().values()) { + for (RecoveryState recoveryState : value) { + assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE)); + } + } + }); + } + + private void waitUntilAllShardsAreUnassigned(Index index) throws Exception { + assertBusy(() -> { + ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + IndexRoutingTable indexRoutingTable = clusterService.state().getRoutingTable().index(index); + assertThat(indexRoutingTable.allPrimaryShardsUnassigned(), equalTo(true)); + }); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index c59536a153076..7bb38a2216da2 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -54,11 +55,11 @@ import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsResponse; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; -import org.joda.time.Instant; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -713,6 +714,7 @@ public void testSnapshotMountedIndexWithTimestampsRecordsTimestampRangeInIndexMe final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final int numShards = between(1, 3); + final String dateType = randomFrom("date", "date_nanos"); assertAcked( client().admin() .indices() @@ -723,7 +725,7 @@ public void testSnapshotMountedIndexWithTimestampsRecordsTimestampRangeInIndexMe .startObject() .startObject("properties") .startObject(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) - .field("type", "date_nanos") + .field("type", dateType) .field("format", "strict_date_optional_time_nanos") .endObject() .endObject() @@ -787,8 +789,11 @@ public void testSnapshotMountedIndexWithTimestampsRecordsTimestampRangeInIndexMe assertThat(timestampMillisRange, sameInstance(IndexLongFieldRange.EMPTY)); } else { assertThat(timestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY))); - assertThat(timestampMillisRange.getMin(), greaterThanOrEqualTo(Instant.parse("2020-11-26T00:00:00Z").getMillis())); - assertThat(timestampMillisRange.getMin(), lessThanOrEqualTo(Instant.parse("2020-11-27T00:00:00Z").getMillis())); + DateFieldMapper.Resolution resolution = dateType.equals("date") + ? DateFieldMapper.Resolution.MILLISECONDS + : DateFieldMapper.Resolution.NANOSECONDS; + assertThat(timestampMillisRange.getMin(), greaterThanOrEqualTo(resolution.convert(Instant.parse("2020-11-26T00:00:00Z")))); + assertThat(timestampMillisRange.getMin(), lessThanOrEqualTo(resolution.convert(Instant.parse("2020-11-27T00:00:00Z")))); } }