diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index cba3f48a32fe8..30805d67e7fe3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -237,7 +237,7 @@ void skipShard(SearchShardIterator iterator) { successfulShardExecution(iterator); } - private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { + protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { /* * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the * same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we @@ -306,11 +306,11 @@ protected abstract void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, SearchActionListener listener); - private void fork(final Runnable runnable) { + protected void fork(final Runnable runnable) { executor.execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - + assert false : "Unexpected failure"; } @Override @@ -529,7 +529,11 @@ private void onShardResultConsumed(Result result, SearchShardIterator shardIt) { private void successfulShardExecution(SearchShardIterator shardsIt) { final int remainingOpsOnIterator; if (shardsIt.skip()) { - remainingOpsOnIterator = shardsIt.remaining(); + // It's possible that we're skipping a shard that's unavailable + // but its range was available in the IndexMetadata, in that + // case the shardsIt.remaining() would be 0, expectedTotalOps + // accounts for unavailable shards too. + remainingOpsOnIterator = Math.max(shardsIt.remaining(), 1); } else { remainingOpsOnIterator = shardsIt.remaining() + 1; } diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index a1dfc602fb916..cbc9007d6c3d4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -24,10 +24,14 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.query.CoordinatorRewriteContext; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchService.CanMatchResponse; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.MinAndMax; import org.elasticsearch.search.sort.SortOrder; @@ -58,6 +62,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction, SearchPhase> phaseFactory; private final GroupShardsIterator shardsIts; + private final CoordinatorRewriteContextProvider coordinatorRewriteContextProvider; CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService, BiFunction nodeIdToConnection, @@ -66,13 +71,14 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState, SearchTask task, Function, SearchPhase> phaseFactory, - SearchResponse.Clusters clusters) { + SearchResponse.Clusters clusters, CoordinatorRewriteContextProvider coordinatorRewriteContextProvider) { //We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterState, task, new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters); this.phaseFactory = phaseFactory; this.shardsIts = shardsIts; + this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider; } @Override @@ -100,7 +106,17 @@ private GroupShardsIterator getIterator(CanMatchSearchPhase if (cardinality == 0) { // this is a special case where we have no hit but we need to get at least one search response in order // to produce a valid search result with all the aggs etc. - possibleMatches.set(0); + // Since it's possible that some of the shards that we're skipping are + // unavailable, we would try to query the node that at least has some + // shards available in order to produce a valid search result. + int shardIndexToQuery = 0; + for (int i = 0; i < shardsIts.size(); i++) { + if (shardsIts.get(i).size() > 0) { + shardIndexToQuery = i; + break; + } + } + possibleMatches.set(shardIndexToQuery); } SearchSourceBuilder source = getRequest().source(); int i = 0; @@ -118,6 +134,40 @@ private GroupShardsIterator getIterator(CanMatchSearchPhase return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order())); } + @Override + protected void performPhaseOnShard(int shardIndex, SearchShardIterator shardIt, SearchShardTarget shard) { + CoordinatorRewriteContext coordinatorRewriteContext = + coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardIt.shardId().getIndex()); + + if (coordinatorRewriteContext == null) { + super.performPhaseOnShard(shardIndex, shardIt, shard); + return; + } + + try { + ShardSearchRequest request = buildShardSearchRequest(shardIt, shardIndex); + boolean canMatch = SearchService.queryStillMatchesAfterRewrite(request, coordinatorRewriteContext); + + // Trigger the query as there's still a chance that we can skip + // this shard given other query filters that we cannot apply + // in the coordinator + if (canMatch) { + super.performPhaseOnShard(shardIndex, shardIt, shard); + return; + } + + CanMatchResponse result = new CanMatchResponse(canMatch, null); + result.setSearchShardTarget(shard == null ? new SearchShardTarget(null, shardIt.shardId(), shardIt.getClusterAlias(), + shardIt.getOriginalIndices()) : shard); + result.setShardIndex(shardIndex); + fork(() -> onShardResult(result, shardIt)); + } catch (Exception e) { + // If we fail to rewrite it on the coordinator, just try to execute + // the query in the shard. + super.performPhaseOnShard(shardIndex, shardIt, shard); + } + } + private static List sortShards(GroupShardsIterator shardsIts, MinAndMax[] minAndMaxes, SortOrder order) { diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 6bf8fa4c34421..af95eab85a43c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -40,8 +40,8 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; @@ -787,7 +787,7 @@ public void run() { action.start(); } }; - }, clusters); + }, clusters, searchService.getCoordinatorRewriteContextProvider(timeProvider::getAbsoluteStartMillis)); } else { final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults(executor, circuitBreaker, task.getProgressListener(), searchRequest, shardIterators.size(), diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java index 9243b5e77d6bb..73a8cb13e1b3d 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java @@ -487,6 +487,20 @@ public Query distanceFeatureQuery(Object origin, String pivot, QueryShardContext public Relation isFieldWithinQuery(IndexReader reader, Object from, Object to, boolean includeLower, boolean includeUpper, ZoneId timeZone, DateMathParser dateParser, QueryRewriteContext context) throws IOException { + if (PointValues.size(reader, name()) == 0) { + // no points, so nothing matches + return Relation.DISJOINT; + } + + long minValue = LongPoint.decodeDimension(PointValues.getMinPackedValue(reader, name()), 0); + long maxValue = LongPoint.decodeDimension(PointValues.getMaxPackedValue(reader, name()), 0); + + return isFieldWithinQuery(minValue, maxValue, from, to, includeLower, includeUpper, timeZone, dateParser, context); + } + + public Relation isFieldWithinQuery(long minValue, long maxValue, + Object from, Object to, boolean includeLower, boolean includeUpper, + ZoneId timeZone, DateMathParser dateParser, QueryRewriteContext context) throws IOException { if (dateParser == null) { if (from instanceof Number || to instanceof Number) { // force epoch_millis @@ -518,14 +532,6 @@ public Relation isFieldWithinQuery(IndexReader reader, } } - if (PointValues.size(reader, name()) == 0) { - // no points, so nothing matches - return Relation.DISJOINT; - } - - long minValue = LongPoint.decodeDimension(PointValues.getMinPackedValue(reader, name()), 0); - long maxValue = LongPoint.decodeDimension(PointValues.getMaxPackedValue(reader, name()), 0); - if (minValue >= fromInclusive && maxValue <= toInclusive) { return Relation.WITHIN; } else if (maxValue < fromInclusive || minValue > toInclusive) { diff --git a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java new file mode 100644 index 0000000000000..a4840a75e19df --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.query; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.shard.IndexLongFieldRange; + +import java.util.function.LongSupplier; + +/** + * Context object used to rewrite {@link QueryBuilder} instances into simplified version in the coordinator. + * Instances of this object rely on information stored in the {@code IndexMetadata} for certain indices. + * Right now this context object is able to rewrite range queries that include a known timestamp field + * (i.e. the timestamp field for DataStreams) into a MatchNoneQueryBuilder and skip the shards that + * don't hold queried data. See IndexMetadata#getTimestampMillisRange() for more details + */ +public class CoordinatorRewriteContext extends QueryRewriteContext { + private final Index index; + private IndexLongFieldRange indexLongFieldRange; + private final DateFieldMapper.DateFieldType timestampFieldType; + + public CoordinatorRewriteContext(NamedXContentRegistry xContentRegistry, + NamedWriteableRegistry writeableRegistry, + Client client, + LongSupplier nowInMillis, + Index index, + IndexLongFieldRange indexLongFieldRange, + DateFieldMapper.DateFieldType timestampFieldType) { + super(xContentRegistry, writeableRegistry, client, nowInMillis); + this.index = index; + this.indexLongFieldRange = indexLongFieldRange; + this.timestampFieldType = timestampFieldType; + } + + long getMinTimestamp() { + return indexLongFieldRange.getMin(); + } + + long getMaxTimestamp() { + return indexLongFieldRange.getMax(); + } + + boolean hasTimestampData() { + return indexLongFieldRange.isComplete() && indexLongFieldRange != IndexLongFieldRange.EMPTY; + } + + @Nullable + public MappedFieldType getFieldType(String fieldName) { + if (fieldName.equals(timestampFieldType.name()) == false) { + return null; + } + + return timestampFieldType; + } + + @Override + public CoordinatorRewriteContext convertToCoordinatorRewriteContext() { + return this; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java new file mode 100644 index 0000000000000..43170493d27ac --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.query; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.shard.IndexLongFieldRange; + +import java.util.function.Function; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +public class CoordinatorRewriteContextProvider { + private final NamedXContentRegistry xContentRegistry; + private final NamedWriteableRegistry writeableRegistry; + private final Client client; + private final LongSupplier nowInMillis; + private final Supplier clusterStateSupplier; + private final Function mappingSupplier; + + public CoordinatorRewriteContextProvider(NamedXContentRegistry xContentRegistry, + NamedWriteableRegistry writeableRegistry, + Client client, + LongSupplier nowInMillis, + Supplier clusterStateSupplier, + Function mappingSupplier) { + this.xContentRegistry = xContentRegistry; + this.writeableRegistry = writeableRegistry; + this.client = client; + this.nowInMillis = nowInMillis; + this.clusterStateSupplier = clusterStateSupplier; + this.mappingSupplier = mappingSupplier; + } + + @Nullable + public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) { + ClusterState clusterState = clusterStateSupplier.get(); + IndexMetadata indexMetadata = clusterState.metadata().index(index); + + if (indexMetadata == null || indexMetadata.getTimestampMillisRange().containsAllShardRanges() == false) { + return null; + } + + DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index); + + if (dateFieldType == null) { + return null; + } + + IndexLongFieldRange timestampMillisRange = indexMetadata.getTimestampMillisRange(); + return new CoordinatorRewriteContext(xContentRegistry, + writeableRegistry, + client, + nowInMillis, + index, + timestampMillisRange, + dateFieldType + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java b/server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java index ead32047e113d..a3c644cf85930 100644 --- a/server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java +++ b/server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java @@ -75,6 +75,10 @@ public QueryShardContext convertToShardContext() { return null; } + public CoordinatorRewriteContext convertToCoordinatorRewriteContext() { + return null; + } + /** * Registers an async action that must be executed before the next rewrite round in order to make progress. * This should be used if a rewriteabel needs to fetch some external resources in order to be executed ie. a document diff --git a/server/src/main/java/org/elasticsearch/index/query/RangeQueryBuilder.java b/server/src/main/java/org/elasticsearch/index/query/RangeQueryBuilder.java index 2c3cc5797fca3..ba1478b0fc84f 100644 --- a/server/src/main/java/org/elasticsearch/index/query/RangeQueryBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/RangeQueryBuilder.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.time.DateMathParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.FieldNamesFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; @@ -428,6 +429,22 @@ public String getWriteableName() { // Overridable for testing only protected MappedFieldType.Relation getRelation(QueryRewriteContext queryRewriteContext) throws IOException { + CoordinatorRewriteContext coordinatorRewriteContext = queryRewriteContext.convertToCoordinatorRewriteContext(); + if (coordinatorRewriteContext != null) { + final MappedFieldType fieldType = coordinatorRewriteContext.getFieldType(fieldName); + if (fieldType instanceof DateFieldMapper.DateFieldType) { + final DateFieldMapper.DateFieldType dateFieldType = (DateFieldMapper.DateFieldType) fieldType; + if (coordinatorRewriteContext.hasTimestampData() == false) { + return MappedFieldType.Relation.DISJOINT; + } + long minTimestamp = coordinatorRewriteContext.getMinTimestamp(); + long maxTimestamp = coordinatorRewriteContext.getMaxTimestamp(); + DateMathParser dateMathParser = getForceDateParser(); + return dateFieldType.isFieldWithinQuery(minTimestamp, maxTimestamp, from, to, includeLower, + includeUpper, timeZone, dateMathParser, queryRewriteContext); + } + } + QueryShardContext shardContext = queryRewriteContext.convertToShardContext(); if (shardContext != null) { final MappedFieldType fieldType = shardContext.getFieldType(fieldName); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java b/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java index eb4b3ced2a14a..319fd6f69856d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java @@ -76,6 +76,13 @@ public boolean isComplete() { return shards == null; } + /** + * @return whether this range includes information from all shards and can be used meaningfully. + */ + public boolean containsAllShardRanges() { + return isComplete() && this != IndexLongFieldRange.UNKNOWN; + } + // exposed for testing int[] getShards() { return shards; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9211af62ac2a7..29312bac5622a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1753,14 +1753,7 @@ public ShardLongFieldRange getTimestampMillisRange() { return ShardLongFieldRange.EMPTY; } - try { - return ShardLongFieldRange.of( - dateFieldType.resolution().roundDownToMillis(rawTimestampFieldRange.getMin()), - dateFieldType.resolution().roundUpToMillis(rawTimestampFieldRange.getMax())); - } catch (IllegalArgumentException e) { - logger.debug(new ParameterizedMessage("could not convert {} to a millisecond time range", rawTimestampFieldRange), e); - return ShardLongFieldRange.UNKNOWN; // any search might match this shard - } + return ShardLongFieldRange.of(rawTimestampFieldRange.getMin(), rawTimestampFieldRange.getMax()); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index a669a91d0ba16..d9a19a37ff0d2 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -102,6 +102,7 @@ import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.recovery.RecoveryStats; @@ -1532,6 +1533,15 @@ public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) { return new QueryRewriteContext(xContentRegistry, namedWriteableRegistry, client, nowInMillis); } + public CoordinatorRewriteContextProvider getCoordinatorRewriteContextProvider(LongSupplier nowInMillis) { + return new CoordinatorRewriteContextProvider(xContentRegistry, + namedWriteableRegistry, + client, + nowInMillis, + clusterService::state, + this::getTimestampFieldType); + } + /** * Clears the caches for the given shard id if the shard is still allocated on this node */ diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 4b50193496c9d..fff0942f10bd6 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -59,6 +59,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; import org.elasticsearch.index.query.InnerHitContextBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchNoneQueryBuilder; @@ -1189,24 +1190,34 @@ private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefre try (canMatchSearcher) { QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), canMatchSearcher, request::nowInMillis, request.getClusterAlias(), request.getRuntimeMappings()); - Rewriteable.rewrite(request.getRewriteable(), context, false); - final boolean aliasFilterCanMatch = request.getAliasFilter() - .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; - FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); - MinAndMax minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; - final boolean canMatch; - if (canRewriteToMatchNone(request.source())) { - QueryBuilder queryBuilder = request.source().query(); - canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false; + final boolean canMatch = queryStillMatchesAfterRewrite(request, context); + final MinAndMax minMax; + if (canMatch || hasRefreshPending) { + FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); + minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; } else { - // null query means match_all - canMatch = aliasFilterCanMatch; + minMax = null; } return new CanMatchResponse(canMatch || hasRefreshPending, minMax); } } } + public static boolean queryStillMatchesAfterRewrite(ShardSearchRequest request, QueryRewriteContext context) throws IOException { + Rewriteable.rewrite(request.getRewriteable(), context, false); + final boolean aliasFilterCanMatch = request.getAliasFilter() + .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; + final boolean canMatch; + if (canRewriteToMatchNone(request.source())) { + QueryBuilder queryBuilder = request.source().query(); + canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false; + } else { + // null query means match_all + canMatch = aliasFilterCanMatch; + } + return canMatch; + } + /** * Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words * if the execution of the search request can be early terminated without executing it. This is for instance not possible if @@ -1242,6 +1253,10 @@ public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) { return indicesService.getRewriteContext(nowInMillis); } + public CoordinatorRewriteContextProvider getCoordinatorRewriteContextProvider(LongSupplier nowInMillis) { + return indicesService.getCoordinatorRewriteContextProvider(nowInMillis); + } + public IndicesService getIndicesService() { return indicesService; } diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index ab5731fbcdeb2..7b8e6879b17e0 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -22,12 +22,29 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.query.AbstractQueryBuilder; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -44,6 +61,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -53,12 +71,18 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.elasticsearch.action.search.SearchAsyncActionTests.getShardsIter; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { + private final CoordinatorRewriteContextProvider EMPTY_CONTEXT_PROVIDER = new StaticCoordinatorRewriteContextProviderBuilder().build(); + public void testFilterShards() throws InterruptedException { final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), @@ -83,7 +107,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req AtomicReference> result = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - GroupShardsIterator shardsIter = SearchAsyncActionTests.getShardsIter("idx", + GroupShardsIterator shardsIter = getShardsIter("idx", new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS), 2, randomBoolean(), primaryNode, replicaNode); final SearchRequest searchRequest = new SearchRequest(); @@ -100,7 +124,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req public void run() throws IOException { result.set(iter); latch.countDown(); - }}, SearchResponse.Clusters.EMPTY); + }}, SearchResponse.Clusters.EMPTY, EMPTY_CONTEXT_PROVIDER); canMatchPhase.start(); latch.await(); @@ -150,7 +174,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req AtomicReference> result = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - GroupShardsIterator shardsIter = SearchAsyncActionTests.getShardsIter("idx", + GroupShardsIterator shardsIter = getShardsIter("idx", new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS), 2, randomBoolean(), primaryNode, replicaNode); @@ -168,7 +192,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req public void run() throws IOException { result.set(iter); latch.countDown(); - }}, SearchResponse.Clusters.EMPTY); + }}, SearchResponse.Clusters.EMPTY, EMPTY_CONTEXT_PROVIDER); canMatchPhase.start(); latch.await(); @@ -209,7 +233,7 @@ public void sendCanMatch( final CountDownLatch latch = new CountDownLatch(1); final OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS); final GroupShardsIterator shardsIter = - SearchAsyncActionTests.getShardsIter("idx", originalIndices, 4096, randomBoolean(), primaryNode, replicaNode); + getShardsIter("idx", originalIndices, 4096, randomBoolean(), primaryNode, replicaNode); final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors())); final SearchRequest searchRequest = new SearchRequest(); searchRequest.allowPartialSearchResults(true); @@ -272,7 +296,7 @@ protected void executePhaseOnShard( listener.onFailure(new Exception("failure")); } } - }, SearchResponse.Clusters.EMPTY); + }, SearchResponse.Clusters.EMPTY, EMPTY_CONTEXT_PROVIDER); canMatchPhase.start(); latch.await(); @@ -315,7 +339,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req AtomicReference> result = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - GroupShardsIterator shardsIter = SearchAsyncActionTests.getShardsIter("logs", + GroupShardsIterator shardsIter = getShardsIter("logs", new OriginalIndices(new String[]{"logs"}, SearchRequest.DEFAULT_INDICES_OPTIONS), randomIntBetween(2, 20), randomBoolean(), primaryNode, replicaNode); final SearchRequest searchRequest = new SearchRequest(); @@ -334,7 +358,7 @@ public void run() { result.set(iter); latch.countDown(); } - }, SearchResponse.Clusters.EMPTY); + }, SearchResponse.Clusters.EMPTY, EMPTY_CONTEXT_PROVIDER); canMatchPhase.start(); latch.await(); @@ -392,7 +416,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req AtomicReference> result = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - GroupShardsIterator shardsIter = SearchAsyncActionTests.getShardsIter("logs", + GroupShardsIterator shardsIter = getShardsIter("logs", new OriginalIndices(new String[]{"logs"}, SearchRequest.DEFAULT_INDICES_OPTIONS), numShards, randomBoolean(), primaryNode, replicaNode); final SearchRequest searchRequest = new SearchRequest(); @@ -411,7 +435,7 @@ public void run() { result.set(iter); latch.countDown(); } - }, SearchResponse.Clusters.EMPTY); + }, SearchResponse.Clusters.EMPTY, EMPTY_CONTEXT_PROVIDER); canMatchPhase.start(); latch.await(); @@ -423,4 +447,357 @@ public void run() { assertThat(result.get().size(), equalTo(numShards)); } } + + public void testCanMatchFilteringOnCoordinatorThatCanBeSkipped() throws Exception { + Index dataStreamIndex1 = new Index(".ds-mydata0001", UUIDs.base64UUID()); + Index dataStreamIndex2 = new Index(".ds-mydata0002", UUIDs.base64UUID()); + DataStream dataStream = + new DataStream("mydata", new DataStream.TimestampField("@timestamp"), List.of(dataStreamIndex1, dataStreamIndex2)); + + List regularIndices = + randomList(0, 2, () -> new Index(randomAlphaOfLength(10), UUIDs.base64UUID())); + + long indexMinTimestamp = randomLongBetween(0, 5000); + long indexMaxTimestamp = randomLongBetween(indexMinTimestamp, 5000 * 2); + StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder(); + String timestampFieldName = dataStream.getTimeStampField().getName(); + for (Index dataStreamIndex : dataStream.getIndices()) { + contextProviderBuilder.addIndexMinMaxTimestamps(dataStreamIndex, timestampFieldName, indexMinTimestamp, indexMaxTimestamp); + } + + RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder(timestampFieldName); + // We query a range outside of the timestamp range covered by both datastream indices + rangeQueryBuilder + .from(indexMaxTimestamp + 1) + .to(indexMaxTimestamp + 2); + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder() + .filter(rangeQueryBuilder); + + if (randomBoolean()) { + // Add an additional filter that cannot be evaluated in the coordinator but shouldn't + // affect the end result as we're filtering + queryBuilder.filter(new TermQueryBuilder("fake", "value")); + } + + assignShardsAndExecuteCanMatchPhase(dataStream, regularIndices, contextProviderBuilder.build(), queryBuilder, + (updatedSearchShardIterators, requests) -> { + List skippedShards = updatedSearchShardIterators.stream() + .filter(SearchShardIterator::skip) + .collect(Collectors.toList());; + + List nonSkippedShards = updatedSearchShardIterators.stream() + .filter(searchShardIterator -> searchShardIterator.skip() == false) + .collect(Collectors.toList());; + + int regularIndexShardCount = (int) updatedSearchShardIterators.stream() + .filter(s -> regularIndices.contains(s.shardId().getIndex())) + .count(); + + // When all the shards can be skipped we should query at least 1 + // in order to get a valid search response. + if (regularIndexShardCount == 0) { + assertThat(nonSkippedShards.size(), equalTo(1)); + } else { + boolean allNonSkippedShardsAreFromRegularIndices = nonSkippedShards.stream() + .allMatch(shardIterator -> regularIndices.contains(shardIterator.shardId().getIndex())); + + assertThat(allNonSkippedShardsAreFromRegularIndices, equalTo(true)); + } + + boolean allSkippedShardAreFromDataStream = skippedShards.stream() + .allMatch(shardIterator -> dataStream.getIndices().contains(shardIterator.shardId().getIndex())); + assertThat(allSkippedShardAreFromDataStream, equalTo(true)); + + boolean allRequestsWereTriggeredAgainstRegularIndices = requests.stream() + .allMatch(request -> regularIndices.contains(request.shardId().getIndex())); + assertThat(allRequestsWereTriggeredAgainstRegularIndices, equalTo(true)); + }); + } + + public void testCanMatchFilteringOnCoordinatorParsingFails() throws Exception { + Index dataStreamIndex1 = new Index(".ds-mydata0001", UUIDs.base64UUID()); + Index dataStreamIndex2 = new Index(".ds-mydata0002", UUIDs.base64UUID()); + DataStream dataStream = + new DataStream("mydata", new DataStream.TimestampField("@timestamp"), List.of(dataStreamIndex1, dataStreamIndex2)); + + List regularIndices = + randomList(0, 2, () -> new Index(randomAlphaOfLength(10), UUIDs.base64UUID())); + + long indexMinTimestamp = randomLongBetween(0, 5000); + long indexMaxTimestamp = randomLongBetween(indexMinTimestamp, 5000 * 2); + StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder(); + String timestampFieldName = dataStream.getTimeStampField().getName(); + for (Index dataStreamIndex : dataStream.getIndices()) { + contextProviderBuilder.addIndexMinMaxTimestamps(dataStreamIndex, timestampFieldName, indexMinTimestamp, indexMaxTimestamp); + } + + RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder(timestampFieldName); + // Query with a non default date format + rangeQueryBuilder + .from("2020-1-01") + .to("2021-1-01"); + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder() + .filter(rangeQueryBuilder); + + if (randomBoolean()) { + // Add an additional filter that cannot be evaluated in the coordinator but shouldn't + // affect the end result as we're filtering + queryBuilder.filter(new TermQueryBuilder("fake", "value")); + } + + assignShardsAndExecuteCanMatchPhase(dataStream, + regularIndices, + contextProviderBuilder.build(), + queryBuilder, + this::assertAllShardsAreQueried + ); + } + + public void testCanMatchFilteringOnCoordinatorThatCanNotBeSkipped() throws Exception { + // Generate indices + Index dataStreamIndex1 = new Index(".ds-mydata0001", UUIDs.base64UUID()); + Index dataStreamIndex2 = new Index(".ds-mydata0002", UUIDs.base64UUID()); + DataStream dataStream = + new DataStream("mydata", new DataStream.TimestampField("@timestamp"), List.of(dataStreamIndex1, dataStreamIndex2)); + + List regularIndices = + randomList(0, 2, () -> new Index(randomAlphaOfLength(10), UUIDs.base64UUID())); + + long indexMinTimestamp = 10; + long indexMaxTimestamp = 20; + StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder(); + String timestampFieldName = dataStream.getTimeStampField().getName(); + for (Index dataStreamIndex : dataStream.getIndices()) { + contextProviderBuilder.addIndexMinMaxTimestamps(dataStreamIndex, timestampFieldName, indexMinTimestamp, indexMaxTimestamp); + } + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); + // Query inside of the data stream index range + if (randomBoolean()) { + // Query generation + RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder(timestampFieldName); + // We query a range within the timestamp range covered by both datastream indices + rangeQueryBuilder + .from(indexMinTimestamp) + .to(indexMaxTimestamp); + + queryBuilder.filter(rangeQueryBuilder); + + if (randomBoolean()) { + // Add an additional filter that cannot be evaluated in the coordinator but shouldn't + // affect the end result as we're filtering + queryBuilder.filter(new TermQueryBuilder("fake", "value")); + } + } else { + // We query a range outside of the timestamp range covered by both datastream indices + RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder(timestampFieldName) + .from(indexMaxTimestamp + 1) + .to(indexMaxTimestamp + 2); + + TermQueryBuilder termQueryBuilder = new TermQueryBuilder("fake", "value"); + + // This is always evaluated as true in the coordinator as we cannot determine there if + // the term query clause is false. + queryBuilder.should(rangeQueryBuilder) + .should(termQueryBuilder); + } + + assignShardsAndExecuteCanMatchPhase(dataStream, + regularIndices, + contextProviderBuilder.build(), + queryBuilder, + this::assertAllShardsAreQueried + ); + } + + private void assertAllShardsAreQueried(List updatedSearchShardIterators, List requests) { + int skippedShards = (int) updatedSearchShardIterators.stream() + .filter(SearchShardIterator::skip) + .count(); + + assertThat(skippedShards, equalTo(0)); + + int nonSkippedShards = (int) updatedSearchShardIterators.stream() + .filter(searchShardIterator -> searchShardIterator.skip() == false) + .count(); + + assertThat(nonSkippedShards, equalTo(updatedSearchShardIterators.size())); + + int shardsWithPrimariesAssigned = (int) updatedSearchShardIterators.stream() + .filter(s -> s.size() > 0) + .count(); + assertThat(requests.size(), equalTo(shardsWithPrimariesAssigned)); + } + + private > + void assignShardsAndExecuteCanMatchPhase(DataStream dataStream, + List regularIndices, + CoordinatorRewriteContextProvider contextProvider, + AbstractQueryBuilder query, + BiConsumer, + List> canMatchResultsConsumer) throws Exception { + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); + + List indicesToSearch = new ArrayList<>(); + indicesToSearch.add(dataStream.getName()); + for (Index regularIndex : regularIndices) { + indicesToSearch.add(regularIndex.getName()); + } + + String[] indices = indicesToSearch.toArray(new String[0]); + OriginalIndices originalIndices = new OriginalIndices(indices, SearchRequest.DEFAULT_INDICES_OPTIONS); + + boolean atLeastOnePrimaryAssigned = false; + final List originalShardIters = new ArrayList<>(); + for (Index dataStreamIndex : dataStream.getIndices()) { + // If we have to execute the can match request against all the shards + // and none is assigned, the phase is considered as failed meaning that the next phase won't be executed + boolean withAssignedPrimaries = randomBoolean() || atLeastOnePrimaryAssigned == false; + int numShards = randomIntBetween(1, 6); + originalShardIters.addAll( + getShardsIter(dataStreamIndex, + originalIndices, + numShards, + false, + withAssignedPrimaries ? primaryNode : null, + null) + ); + atLeastOnePrimaryAssigned |= withAssignedPrimaries; + } + + for (Index regularIndex : regularIndices) { + originalShardIters.addAll( + getShardsIter(regularIndex, + originalIndices, + randomIntBetween(1, 6), + randomBoolean(), + primaryNode, + replicaNode) + ); + } + GroupShardsIterator shardsIter = GroupShardsIterator.sortAndCreate(originalShardIters); + + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(indices); + searchRequest.allowPartialSearchResults(true); + + final AliasFilter aliasFilter; + if (randomBoolean()) { + // Apply the query on the request body + SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource(); + searchSourceBuilder.query(query); + searchRequest.source(searchSourceBuilder); + + // Sometimes apply the same query in the alias filter too + aliasFilter = new AliasFilter(randomBoolean() ? query : null, Strings.EMPTY_ARRAY); + } else { + // Apply the query as an alias filter + aliasFilter = new AliasFilter(query, Strings.EMPTY_ARRAY); + } + + Map aliasFilters = new HashMap<>(); + for (Index dataStreamIndex : dataStream.getIndices()) { + aliasFilters.put(dataStreamIndex.getUUID(), aliasFilter); + } + + for (Index regularIndex : regularIndices) { + aliasFilters.put(regularIndex.getUUID(), aliasFilter); + } + + // We respond by default that the query can match + final List requests = Collections.synchronizedList(new ArrayList<>()); + SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { + @Override + public void sendCanMatch(Transport.Connection connection, ShardSearchRequest request, SearchTask task, + ActionListener listener) { + requests.add(request); + listener.onResponse(new SearchService.CanMatchResponse(true, null)); + } + }; + + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + + AtomicReference> result = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger, + searchTransportService, + (clusterAlias, node) -> lookup.get(node), + aliasFilters, + Collections.emptyMap(), + EsExecutors.newDirectExecutorService(), + searchRequest, + null, + shardsIter, + timeProvider, + ClusterState.EMPTY_STATE, + null, + (iter) -> new SearchPhase("test") { + @Override + public void run() throws IOException { + result.set(iter); + latch.countDown(); + } + }, + SearchResponse.Clusters.EMPTY, + contextProvider); + + canMatchPhase.start(); + latch.await(); + + List updatedSearchShardIterators = new ArrayList<>(); + for (SearchShardIterator updatedSearchShardIterator : result.get()) { + updatedSearchShardIterators.add(updatedSearchShardIterator); + } + + canMatchResultsConsumer.accept(updatedSearchShardIterators, requests); + } + + private static class StaticCoordinatorRewriteContextProviderBuilder { + private ClusterState clusterState = ClusterState.EMPTY_STATE; + private final Map fields = new HashMap<>(); + + private void addIndexMinMaxTimestamps(Index index, String fieldName, long minTimeStamp, long maxTimestamp) { + if (clusterState.metadata().index(index) != null) { + throw new IllegalArgumentException("Min/Max timestamps for " + index + " were already defined"); + } + + IndexLongFieldRange timestampMillisRange = IndexLongFieldRange.NO_SHARDS + .extendWithShardRange(0, 1, ShardLongFieldRange.of(minTimeStamp, maxTimestamp)); + + Settings.Builder indexSettings = settings(Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()); + + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(index.getName()) + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .timestampMillisRange(timestampMillisRange); + + Metadata.Builder metadataBuilder = + Metadata.builder(clusterState.metadata()) + .put(indexMetadataBuilder); + + clusterState = ClusterState.builder(clusterState) + .metadata(metadataBuilder) + .build(); + + fields.put(index, new DateFieldMapper.DateFieldType(fieldName)); + } + + public CoordinatorRewriteContextProvider build() { + return new CoordinatorRewriteContextProvider(NamedXContentRegistry.EMPTY, + mock(NamedWriteableRegistry.class), + mock(Client.class), + System::currentTimeMillis, + () -> clusterState, + fields::get); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 06a3e3b8ee549..69a6025b09706 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -48,12 +48,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -61,6 +63,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class SearchAsyncActionTests extends ESTestCase { @@ -575,26 +578,107 @@ protected void executeNext(Runnable runnable, Thread originalThread) { assertThat(numFailReplicas.get(), greaterThanOrEqualTo(1)); } + public void testSkipUnavailableSearchShards() throws InterruptedException { + SearchRequest request = new SearchRequest(); + request.allowPartialSearchResults(true); + ActionListener responseListener = ActionListener.wrap(response -> {}, + (e) -> { throw new AssertionError("unexpected", e);}); + DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + + final int numUnavailableSkippedShards = randomIntBetween(1, 10); + List searchShardIterators = new ArrayList<>(numUnavailableSkippedShards); + OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS); + for (int i = 0; i < numUnavailableSkippedShards; i++) { + Index index = new Index("idx", "_na_"); + SearchShardIterator searchShardIterator = + new SearchShardIterator(null, new ShardId(index, 0), Collections.emptyList(), originalIndices); + // Skip all the shards + searchShardIterator.resetAndSkip(); + searchShardIterators.add(searchShardIterator); + } + GroupShardsIterator shardsIter = new GroupShardsIterator<>(searchShardIterators); + Map lookup = Map.of(primaryNode.getId(), new MockConnection(primaryNode)); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean searchPhaseDidRun = new AtomicBoolean(false); + AbstractSearchAsyncAction asyncAction = + new AbstractSearchAsyncAction<>( + "test", + logger, + new SearchTransportService(null, null, null), + (cluster, node) -> { + assert cluster == null : "cluster was not null: " + cluster; + return lookup.get(node); }, + Map.of("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), + null, + request, + responseListener, + shardsIter, + new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), + ClusterState.EMPTY_STATE, + null, + new ArraySearchPhaseResults<>(shardsIter.size()), + request.getMaxConcurrentShardRequests(), + SearchResponse.Clusters.EMPTY) { + + @Override + protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, + SearchActionListener listener) { + assert false : "Expected to skip all shards"; + } + + @Override + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new SearchPhase("test") { + @Override + public void run() { + assertTrue(searchPhaseDidRun.compareAndSet(false, true)); + latch.countDown(); + } + }; + } + }; + asyncAction.start(); + assertThat(latch.await(4, TimeUnit.SECONDS), equalTo(true)); + assertThat(searchPhaseDidRun.get(), equalTo(true)); + + SearchResponse searchResponse = + asyncAction.buildSearchResponse(null, asyncAction.buildShardFailures(), null, null); + assertThat(searchResponse.getSkippedShards(), equalTo(numUnavailableSkippedShards)); + assertThat(searchResponse.getFailedShards(), equalTo(0)); + assertThat(searchResponse.getSuccessfulShards(), equalTo(shardsIter.size())); + } + static GroupShardsIterator getShardsIter(String index, OriginalIndices originalIndices, int numShards, boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) { + return new GroupShardsIterator<>( + getShardsIter(new Index(index, "_na_"), originalIndices, numShards, doReplicas, primaryNode, replicaNode) + ); + } + + static List getShardsIter(Index index, OriginalIndices originalIndices, int numShards, + boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) { ArrayList list = new ArrayList<>(); for (int i = 0; i < numShards; i++) { ArrayList started = new ArrayList<>(); ArrayList initializing = new ArrayList<>(); ArrayList unassigned = new ArrayList<>(); - ShardRouting routing = ShardRouting.newUnassigned(new ShardId(new Index(index, "_na_"), i), true, + ShardRouting routing = ShardRouting.newUnassigned(new ShardId(index, i), true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); - routing = routing.initialize(primaryNode.getId(), i + "p", 0); - routing.started(); - started.add(routing); - if (doReplicas) { - routing = ShardRouting.newUnassigned(new ShardId(new Index(index, "_na_"), i), false, + if (primaryNode != null) { + routing = routing.initialize(primaryNode.getId(), i + "p", 0); + routing = routing.moveToStarted(); + started.add(routing); + } + if (doReplicas && primaryNode != null) { + routing = ShardRouting.newUnassigned(new ShardId(index, i), false, RecoverySource.PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); if (replicaNode != null) { routing = routing.initialize(replicaNode.getId(), i + "r", 0); if (randomBoolean()) { - routing.started(); + routing = routing.moveToStarted(); started.add(routing); } else { initializing.add(routing); @@ -605,11 +689,12 @@ static GroupShardsIterator getShardsIter(String index, Orig } Collections.shuffle(started, random()); started.addAll(initializing); - list.add(new SearchShardIterator(null, new ShardId(new Index(index, "_na_"), i), started, originalIndices)); + list.add(new SearchShardIterator(null, new ShardId(index, i), started, originalIndices)); } - return new GroupShardsIterator<>(list); + return list; } + public static class TestSearchResponse extends SearchResponse { final Set queried = new HashSet<>(); diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java index a9326b65f666d..c343d8c131e06 100644 --- a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.IndexLongFieldRange; @@ -53,9 +54,9 @@ import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse; import org.elasticsearch.xpack.frozen.FrozenIndices; import org.hamcrest.Matchers; -import org.joda.time.Instant; import java.io.IOException; +import java.time.Instant; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -502,8 +503,8 @@ public void testComputesTimestampRangeFromMilliseconds() { assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.EMPTY))); assertTrue(timestampFieldRange.isComplete()); - assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-05T01:02:03.456Z").getMillis())); - assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis())); + assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-05T01:02:03.456Z").toEpochMilli())); + assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").toEpochMilli())); for (ShardStats shardStats : client().admin().indices().prepareStats("index").clear().setRefresh(true).get().getShards()) { assertThat("shard " + shardStats.getShardRouting() + " refreshed to get the timestamp range", @@ -534,8 +535,11 @@ public void testComputesTimestampRangeFromNanoseconds() throws IOException { assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.EMPTY))); assertTrue(timestampFieldRange.isComplete()); - assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-05T01:02:03.456Z").getMillis())); - assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.568Z").getMillis())); + final DateFieldMapper.Resolution resolution = DateFieldMapper.Resolution.NANOSECONDS; + assertThat(timestampFieldRange.getMin(), + equalTo(resolution.convert(Instant.parse("2010-01-05T01:02:03.456789012Z")))); + assertThat(timestampFieldRange.getMax(), + equalTo(resolution.convert(Instant.parse("2010-01-06T02:03:04.567890123Z")))); for (ShardStats shardStats : client().admin().indices().prepareStats("index").clear().setRefresh(true).get().getShards()) { assertThat("shard " + shardStats.getShardRouting() + " refreshed to get the timestamp range", diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java new file mode 100644 index 0000000000000..38cb637925e4c --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java @@ -0,0 +1,506 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots; + +import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING; +import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SearchableSnapshotsCanMatchOnCoordinatorIntegTests extends BaseSearchableSnapshotsIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(LocalStateSearchableSnapshots.class, MockTransportService.TestPlugin.class, MockRepository.Plugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + // Use an unbound cache so we can recover the searchable snapshot completely all the times + .put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)) + .build(); + } + + public void testSearchableSnapshotShardsAreSkippedWithoutQueryingAnyNodeWhenTheyAreOutsideOfTheQueryRange() throws Exception { + internalCluster().startMasterOnlyNode(); + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + final String dataNodeHoldingRegularIndex = internalCluster().startDataOnlyNode(); + final String dataNodeHoldingSearchableSnapshot = internalCluster().startDataOnlyNode(); + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNodeHoldingSearchableSnapshot); + + final String indexOutsideSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int indexOutsideSearchRangeShardCount = randomIntBetween(1, 3); + createIndexWithTimestamp(indexOutsideSearchRange, indexOutsideSearchRangeShardCount, Settings.EMPTY); + + final String indexWithinSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int indexWithinSearchRangeShardCount = randomIntBetween(1, 3); + createIndexWithTimestamp( + indexWithinSearchRange, + indexWithinSearchRangeShardCount, + Settings.builder() + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingRegularIndex) + .build() + ); + + final int totalShards = indexOutsideSearchRangeShardCount + indexWithinSearchRangeShardCount; + + // Either add data outside of the range, or documents that don't have timestamp data + final boolean indexDataWithTimestamp = randomBoolean(); + if (indexDataWithTimestamp) { + indexDocumentsWithTimestampWithinDate(indexOutsideSearchRange, between(0, 1000), "2020-11-26T%02d:%02d:%02d.%09dZ"); + } else { + indexRandomDocs(indexOutsideSearchRange, between(0, 1000)); + } + + int numDocsWithinRange = between(0, 1000); + indexDocumentsWithTimestampWithinDate(indexWithinSearchRange, numDocsWithinRange, "2020-11-28T%02d:%02d:%02d.%09dZ"); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "mock"); + + final SnapshotId snapshotId = createSnapshot(repositoryName, "snapshot-1", List.of(indexOutsideSearchRange)).snapshotId(); + assertAcked(client().admin().indices().prepareDelete(indexOutsideSearchRange)); + + final String searchableSnapshotIndexOutsideSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + // Block the repository for the node holding the searchable snapshot shards + // to delay its restore + blockDataNode(repositoryName, dataNodeHoldingSearchableSnapshot); + + // Force the searchable snapshot to be allocated in a particular node + Settings restoredIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()) + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingSearchableSnapshot) + .build(); + + final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( + searchableSnapshotIndexOutsideSearchRange, + repositoryName, + snapshotId.getName(), + indexOutsideSearchRange, + restoredIndexSettings, + Strings.EMPTY_ARRAY, + false + ); + client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).actionGet(); + + final IndexMetadata indexMetadata = getIndexMetadata(searchableSnapshotIndexOutsideSearchRange); + assertThat(indexMetadata.getTimestampMillisRange(), equalTo(IndexLongFieldRange.NO_SHARDS)); + + DateFieldMapper.DateFieldType timestampFieldType = indicesService.getTimestampFieldType(indexMetadata.getIndex()); + assertThat(timestampFieldType, nullValue()); + + final boolean includeIndexCoveringSearchRangeInSearchRequest = randomBoolean(); + List indicesToSearch = new ArrayList<>(); + if (includeIndexCoveringSearchRangeInSearchRequest) { + indicesToSearch.add(indexWithinSearchRange); + } + indicesToSearch.add(searchableSnapshotIndexOutsideSearchRange); + SearchRequest request = new SearchRequest().indices(indicesToSearch.toArray(new String[0])) + .source( + new SearchSourceBuilder().query( + QueryBuilders.rangeQuery(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .from("2020-11-28T00:00:00.000000000Z", true) + .to("2020-11-29T00:00:00.000000000Z") + ) + ); + + if (includeIndexCoveringSearchRangeInSearchRequest) { + SearchResponse searchResponse = client().search(request).actionGet(); + + // All the regular index searches succeeded + assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount)); + // All the searchable snapshots shard search failed + assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(searchResponse.getSkippedShards(), equalTo(0)); + assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); + } else { + // All shards failed, since all shards are unassigned and the IndexMetadata min/max timestamp + // is not available yet + expectThrows(SearchPhaseExecutionException.class, () -> client().search(request).actionGet()); + } + + // Allow the searchable snapshots to be finally mounted + unblockNode(repositoryName, dataNodeHoldingSearchableSnapshot); + waitUntilRecoveryIsDone(searchableSnapshotIndexOutsideSearchRange); + ensureGreen(searchableSnapshotIndexOutsideSearchRange); + + final IndexMetadata updatedIndexMetadata = getIndexMetadata(searchableSnapshotIndexOutsideSearchRange); + final IndexLongFieldRange updatedTimestampMillisRange = updatedIndexMetadata.getTimestampMillisRange(); + final DateFieldMapper.DateFieldType dateFieldType = indicesService.getTimestampFieldType(updatedIndexMetadata.getIndex()); + assertThat(dateFieldType, notNullValue()); + final DateFieldMapper.Resolution resolution = dateFieldType.resolution(); + assertThat(updatedTimestampMillisRange.isComplete(), equalTo(true)); + if (indexDataWithTimestamp) { + assertThat(updatedTimestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertThat( + updatedTimestampMillisRange.getMin(), + greaterThanOrEqualTo(resolution.convert(Instant.parse("2020-11-26T00:00:00Z"))) + ); + assertThat(updatedTimestampMillisRange.getMax(), lessThanOrEqualTo(resolution.convert(Instant.parse("2020-11-27T00:00:00Z")))); + } else { + assertThat(updatedTimestampMillisRange, sameInstance(IndexLongFieldRange.EMPTY)); + } + + // Stop the node holding the searchable snapshots, and since we defined + // the index allocation criteria to require the searchable snapshot + // index to be allocated in that node, the shards should remain unassigned + internalCluster().stopNode(dataNodeHoldingSearchableSnapshot); + waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex()); + + if (includeIndexCoveringSearchRangeInSearchRequest) { + SearchResponse newSearchResponse = client().search(request).actionGet(); + + assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getFailedShards(), equalTo(0)); + assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo((long) numDocsWithinRange)); + } else { + if (indexOutsideSearchRangeShardCount == 1) { + expectThrows(SearchPhaseExecutionException.class, () -> client().search(request).actionGet()); + } else { + SearchResponse newSearchResponse = client().search(request).actionGet(); + // When all shards are skipped, at least one of them should be queried in order to + // provide a proper search response. + assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); + assertThat(newSearchResponse.getFailedShards(), equalTo(1)); + assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount)); + } + } + } + + public void testQueryPhaseIsExecutedInAnAvailableNodeWhenAllShardsCanBeSkipped() throws Exception { + internalCluster().startMasterOnlyNode(); + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + final String dataNodeHoldingRegularIndex = internalCluster().startDataOnlyNode(); + final String dataNodeHoldingSearchableSnapshot = internalCluster().startDataOnlyNode(); + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNodeHoldingSearchableSnapshot); + + final String indexOutsideSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int indexOutsideSearchRangeShardCount = randomIntBetween(1, 3); + createIndexWithTimestamp( + indexOutsideSearchRange, + indexOutsideSearchRangeShardCount, + Settings.builder() + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingRegularIndex) + .build() + ); + + indexDocumentsWithTimestampWithinDate(indexOutsideSearchRange, between(0, 1000), "2020-11-26T%02d:%02d:%02d.%09dZ"); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "mock"); + + final SnapshotId snapshotId = createSnapshot(repositoryName, "snapshot-1", List.of(indexOutsideSearchRange)).snapshotId(); + + final String searchableSnapshotIndexOutsideSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + // Block the repository for the node holding the searchable snapshot shards + // to delay its restore + blockDataNode(repositoryName, dataNodeHoldingSearchableSnapshot); + + // Force the searchable snapshot to be allocated in a particular node + Settings restoredIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()) + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingSearchableSnapshot) + .build(); + + final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( + searchableSnapshotIndexOutsideSearchRange, + repositoryName, + snapshotId.getName(), + indexOutsideSearchRange, + restoredIndexSettings, + Strings.EMPTY_ARRAY, + false + ); + client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).actionGet(); + final int searchableSnapshotShardCount = indexOutsideSearchRangeShardCount; + + final IndexMetadata indexMetadata = getIndexMetadata(searchableSnapshotIndexOutsideSearchRange); + assertThat(indexMetadata.getTimestampMillisRange(), equalTo(IndexLongFieldRange.NO_SHARDS)); + + DateFieldMapper.DateFieldType timestampFieldType = indicesService.getTimestampFieldType(indexMetadata.getIndex()); + assertThat(timestampFieldType, nullValue()); + + SearchRequest request = new SearchRequest().indices(indexOutsideSearchRange, searchableSnapshotIndexOutsideSearchRange) + .source( + new SearchSourceBuilder().query( + QueryBuilders.rangeQuery(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .from("2020-11-28T00:00:00.000000000Z", true) + .to("2020-11-29T00:00:00.000000000Z") + ) + ); + + final int totalShards = indexOutsideSearchRangeShardCount + searchableSnapshotShardCount; + SearchResponse searchResponse = client().search(request).actionGet(); + + // All the regular index searches succeeded + assertThat(searchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount)); + // All the searchable snapshots shard search failed + assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(searchResponse.getSkippedShards(), equalTo(searchableSnapshotShardCount)); + assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L)); + + // Allow the searchable snapshots to be finally mounted + unblockNode(repositoryName, dataNodeHoldingSearchableSnapshot); + waitUntilRecoveryIsDone(searchableSnapshotIndexOutsideSearchRange); + ensureGreen(searchableSnapshotIndexOutsideSearchRange); + + final IndexMetadata updatedIndexMetadata = getIndexMetadata(searchableSnapshotIndexOutsideSearchRange); + final IndexLongFieldRange updatedTimestampMillisRange = updatedIndexMetadata.getTimestampMillisRange(); + final DateFieldMapper.DateFieldType dateFieldType = indicesService.getTimestampFieldType(updatedIndexMetadata.getIndex()); + assertThat(dateFieldType, notNullValue()); + final DateFieldMapper.Resolution resolution = dateFieldType.resolution(); + assertThat(updatedTimestampMillisRange.isComplete(), equalTo(true)); + assertThat(updatedTimestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertThat(updatedTimestampMillisRange.getMin(), greaterThanOrEqualTo(resolution.convert(Instant.parse("2020-11-26T00:00:00Z")))); + assertThat(updatedTimestampMillisRange.getMax(), lessThanOrEqualTo(resolution.convert(Instant.parse("2020-11-27T00:00:00Z")))); + + // Stop the node holding the searchable snapshots, and since we defined + // the index allocation criteria to require the searchable snapshot + // index to be allocated in that node, the shards should remain unassigned + internalCluster().stopNode(dataNodeHoldingSearchableSnapshot); + waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex()); + + SearchResponse newSearchResponse = client().search(request).actionGet(); + + // All the regular index searches succeeded + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getFailedShards(), equalTo(0)); + // We have to query at least one node to construct a valid response, and we pick + // a shard that's available in order to construct the search response + assertThat(newSearchResponse.getSkippedShards(), equalTo(totalShards - 1)); + assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo(0L)); + } + + public void testSearchableSnapshotShardsThatHaveMatchingDataAreNotSkippedOnTheCoordinatingNode() throws Exception { + internalCluster().startMasterOnlyNode(); + internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + final String dataNodeHoldingRegularIndex = internalCluster().startDataOnlyNode(); + final String dataNodeHoldingSearchableSnapshot = internalCluster().startDataOnlyNode(); + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNodeHoldingSearchableSnapshot); + + final String indexWithinSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int indexWithinSearchRangeShardCount = randomIntBetween(1, 3); + createIndexWithTimestamp( + indexWithinSearchRange, + indexWithinSearchRangeShardCount, + Settings.builder() + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingRegularIndex) + .build() + ); + + indexDocumentsWithTimestampWithinDate(indexWithinSearchRange, between(1, 1000), "2020-11-28T%02d:%02d:%02d.%09dZ"); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "mock"); + + final SnapshotId snapshotId = createSnapshot(repositoryName, "snapshot-1", List.of(indexWithinSearchRange)).snapshotId(); + assertAcked(client().admin().indices().prepareDelete(indexWithinSearchRange)); + + final String searchableSnapshotIndexWithinSearchRange = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + + // Block the repository for the node holding the searchable snapshot shards + // to delay its restore + blockDataNode(repositoryName, dataNodeHoldingSearchableSnapshot); + + // Force the searchable snapshot to be allocated in a particular node + Settings restoredIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()) + .put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingSearchableSnapshot) + .build(); + + final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( + searchableSnapshotIndexWithinSearchRange, + repositoryName, + snapshotId.getName(), + indexWithinSearchRange, + restoredIndexSettings, + Strings.EMPTY_ARRAY, + false + ); + client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).actionGet(); + + final IndexMetadata indexMetadata = getIndexMetadata(searchableSnapshotIndexWithinSearchRange); + assertThat(indexMetadata.getTimestampMillisRange(), equalTo(IndexLongFieldRange.NO_SHARDS)); + + DateFieldMapper.DateFieldType timestampFieldType = indicesService.getTimestampFieldType(indexMetadata.getIndex()); + assertThat(timestampFieldType, nullValue()); + + SearchRequest request = new SearchRequest().indices(searchableSnapshotIndexWithinSearchRange) + .source( + new SearchSourceBuilder().query( + QueryBuilders.rangeQuery(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .from("2020-11-28T00:00:00.000000000Z", true) + .to("2020-11-29T00:00:00.000000000Z") + ) + ); + + // All shards failed, since all shards are unassigned and the IndexMetadata min/max timestamp + // is not available yet + expectThrows(SearchPhaseExecutionException.class, () -> client().search(request).actionGet()); + + // Allow the searchable snapshots to be finally mounted + unblockNode(repositoryName, dataNodeHoldingSearchableSnapshot); + waitUntilRecoveryIsDone(searchableSnapshotIndexWithinSearchRange); + ensureGreen(searchableSnapshotIndexWithinSearchRange); + + final IndexMetadata updatedIndexMetadata = getIndexMetadata(searchableSnapshotIndexWithinSearchRange); + final IndexLongFieldRange updatedTimestampMillisRange = updatedIndexMetadata.getTimestampMillisRange(); + final DateFieldMapper.DateFieldType dateFieldType = indicesService.getTimestampFieldType(updatedIndexMetadata.getIndex()); + assertThat(dateFieldType, notNullValue()); + final DateFieldMapper.Resolution resolution = dateFieldType.resolution(); + assertThat(updatedTimestampMillisRange.isComplete(), equalTo(true)); + assertThat(updatedTimestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertThat(updatedTimestampMillisRange.getMin(), greaterThanOrEqualTo(resolution.convert(Instant.parse("2020-11-28T00:00:00Z")))); + assertThat(updatedTimestampMillisRange.getMax(), lessThanOrEqualTo(resolution.convert(Instant.parse("2020-11-29T00:00:00Z")))); + + // Stop the node holding the searchable snapshots, and since we defined + // the index allocation criteria to require the searchable snapshot + // index to be allocated in that node, the shards should remain unassigned + internalCluster().stopNode(dataNodeHoldingSearchableSnapshot); + waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex()); + + // The range query matches but the shards that are unavailable, in that case the search fails, as all shards that hold + // data are unavailable + expectThrows(SearchPhaseExecutionException.class, () -> client().search(request).actionGet()); + } + + private void createIndexWithTimestamp(String indexName, int numShards, Settings extraSettings) throws IOException { + assertAcked( + client().admin() + .indices() + .prepareCreate(indexName) + .setMapping( + XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .field("type", randomFrom("date", "date_nanos")) + .field("format", "strict_date_optional_time_nanos") + .endObject() + .endObject() + .endObject() + ) + .setSettings(indexSettingsNoReplicas(numShards).put(INDEX_SOFT_DELETES_SETTING.getKey(), true).put(extraSettings)) + ); + ensureGreen(indexName); + } + + private void indexDocumentsWithTimestampWithinDate(String indexName, int docCount, String timestampTemplate) throws Exception { + final List indexRequestBuilders = new ArrayList<>(); + for (int i = 0; i < docCount; i++) { + indexRequestBuilders.add( + client().prepareIndex(indexName) + .setSource( + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, + String.format( + Locale.ROOT, + timestampTemplate, + between(0, 23), + between(0, 59), + between(0, 59), + randomLongBetween(0, 999999999L) + ) + ) + ); + } + indexRandom(true, false, indexRequestBuilders); + + assertThat( + client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(), + equalTo(0) + ); + refresh(indexName); + forceMerge(); + } + + private IndexMetadata getIndexMetadata(String indexName) { + return client().admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setIndices(indexName) + .get() + .getState() + .metadata() + .index(indexName); + } + + private void waitUntilRecoveryIsDone(String index) throws Exception { + assertBusy(() -> { + RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(index).get(); + assertThat(recoveryResponse.hasRecoveries(), equalTo(true)); + for (List value : recoveryResponse.shardRecoveryStates().values()) { + for (RecoveryState recoveryState : value) { + assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE)); + } + } + }); + } + + private void waitUntilAllShardsAreUnassigned(Index index) throws Exception { + assertBusy(() -> { + ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + IndexRoutingTable indexRoutingTable = clusterService.state().getRoutingTable().index(index); + assertThat(indexRoutingTable.allPrimaryShardsUnassigned(), equalTo(true)); + }); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 1b868daa1dc6e..abf19c5a1d977 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -54,11 +55,11 @@ import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsResponse; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; -import org.joda.time.Instant; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -712,6 +713,7 @@ public void testSnapshotMountedIndexWithTimestampsRecordsTimestampRangeInIndexMe final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final int numShards = between(1, 3); + final String dateType = randomFrom("date", "date_nanos"); assertAcked( client().admin() .indices() @@ -721,7 +723,7 @@ public void testSnapshotMountedIndexWithTimestampsRecordsTimestampRangeInIndexMe .startObject() .startObject("properties") .startObject(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) - .field("type", "date_nanos") + .field("type", dateType) .field("format", "strict_date_optional_time_nanos") .endObject() .endObject() @@ -784,8 +786,11 @@ public void testSnapshotMountedIndexWithTimestampsRecordsTimestampRangeInIndexMe assertThat(timestampMillisRange, sameInstance(IndexLongFieldRange.EMPTY)); } else { assertThat(timestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY))); - assertThat(timestampMillisRange.getMin(), greaterThanOrEqualTo(Instant.parse("2020-11-26T00:00:00Z").getMillis())); - assertThat(timestampMillisRange.getMin(), lessThanOrEqualTo(Instant.parse("2020-11-27T00:00:00Z").getMillis())); + DateFieldMapper.Resolution resolution = dateType.equals("date") + ? DateFieldMapper.Resolution.MILLISECONDS + : DateFieldMapper.Resolution.NANOSECONDS; + assertThat(timestampMillisRange.getMin(), greaterThanOrEqualTo(resolution.convert(Instant.parse("2020-11-26T00:00:00Z")))); + assertThat(timestampMillisRange.getMin(), lessThanOrEqualTo(resolution.convert(Instant.parse("2020-11-27T00:00:00Z")))); } }