Skip to content

Commit

Permalink
Apply can match phase on coordinator when the min max field data is a…
Browse files Browse the repository at this point in the history
…vailable at the coordinator. (#65583)

This commit introduces an optimization that allows skipping shards that
are not necessary directly on the coordinator for time based indices.
This is possible for frozen and searchable snapshots since those store
their min/max timestamp range in their IndexMetadata (introduced in #65689).
For indices that don't have that information available, the behaviour is
the same as it used to be.
  • Loading branch information
fcofdez authored Dec 15, 2020
1 parent 349194d commit 32200b1
Show file tree
Hide file tree
Showing 17 changed files with 1,312 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ void skipShard(SearchShardIterator iterator) {
successfulShardExecution(iterator);
}

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
* same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we
Expand Down Expand Up @@ -306,11 +306,11 @@ protected abstract void executePhaseOnShard(SearchShardIterator shardIt,
SearchShardTarget shard,
SearchActionListener<Result> listener);

private void fork(final Runnable runnable) {
protected void fork(final Runnable runnable) {
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {

assert false : "Unexpected failure";
}

@Override
Expand Down Expand Up @@ -529,7 +529,11 @@ private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {
private void successfulShardExecution(SearchShardIterator shardsIt) {
final int remainingOpsOnIterator;
if (shardsIt.skip()) {
remainingOpsOnIterator = shardsIt.remaining();
// It's possible that we're skipping a shard that's unavailable
// but its range was available in the IndexMetadata, in that
// case the shardsIt.remaining() would be 0, expectedTotalOps
// accounts for unavailable shards too.
remainingOpsOnIterator = Math.max(shardsIt.remaining(), 1);
} else {
remainingOpsOnIterator = shardsIt.remaining() + 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.query.CoordinatorRewriteContext;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchService.CanMatchResponse;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.MinAndMax;
import org.elasticsearch.search.sort.SortOrder;
Expand Down Expand Up @@ -58,6 +62,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa

private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
private final CoordinatorRewriteContextProvider coordinatorRewriteContextProvider;

CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Expand All @@ -66,13 +71,14 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState,
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters) {
SearchResponse.Clusters clusters, CoordinatorRewriteContextProvider coordinatorRewriteContextProvider) {
//We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts,
executor, request, listener, shardsIts, timeProvider, clusterState, task,
new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider;
}

@Override
Expand Down Expand Up @@ -100,7 +106,17 @@ private GroupShardsIterator<SearchShardIterator> getIterator(CanMatchSearchPhase
if (cardinality == 0) {
// this is a special case where we have no hit but we need to get at least one search response in order
// to produce a valid search result with all the aggs etc.
possibleMatches.set(0);
// Since it's possible that some of the shards that we're skipping are
// unavailable, we would try to query the node that at least has some
// shards available in order to produce a valid search result.
int shardIndexToQuery = 0;
for (int i = 0; i < shardsIts.size(); i++) {
if (shardsIts.get(i).size() > 0) {
shardIndexToQuery = i;
break;
}
}
possibleMatches.set(shardIndexToQuery);
}
SearchSourceBuilder source = getRequest().source();
int i = 0;
Expand All @@ -118,6 +134,40 @@ private GroupShardsIterator<SearchShardIterator> getIterator(CanMatchSearchPhase
return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order()));
}

@Override
protected void performPhaseOnShard(int shardIndex, SearchShardIterator shardIt, SearchShardTarget shard) {
CoordinatorRewriteContext coordinatorRewriteContext =
coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardIt.shardId().getIndex());

if (coordinatorRewriteContext == null) {
super.performPhaseOnShard(shardIndex, shardIt, shard);
return;
}

try {
ShardSearchRequest request = buildShardSearchRequest(shardIt, shardIndex);
boolean canMatch = SearchService.queryStillMatchesAfterRewrite(request, coordinatorRewriteContext);

// Trigger the query as there's still a chance that we can skip
// this shard given other query filters that we cannot apply
// in the coordinator
if (canMatch) {
super.performPhaseOnShard(shardIndex, shardIt, shard);
return;
}

CanMatchResponse result = new CanMatchResponse(canMatch, null);
result.setSearchShardTarget(shard == null ? new SearchShardTarget(null, shardIt.shardId(), shardIt.getClusterAlias(),
shardIt.getOriginalIndices()) : shard);
result.setShardIndex(shardIndex);
fork(() -> onShardResult(result, shardIt));
} catch (Exception e) {
// If we fail to rewrite it on the coordinator, just try to execute
// the query in the shard.
super.performPhaseOnShard(shardIndex, shardIt, shard);
}
}

private static List<SearchShardIterator> sortShards(GroupShardsIterator<SearchShardIterator> shardsIts,
MinAndMax<?>[] minAndMaxes,
SortOrder order) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -787,7 +787,7 @@ public void run() {
action.start();
}
};
}, clusters);
}, clusters, searchService.getCoordinatorRewriteContextProvider(timeProvider::getAbsoluteStartMillis));
} else {
final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults(executor,
circuitBreaker, task.getProgressListener(), searchRequest, shardIterators.size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,20 @@ public Query distanceFeatureQuery(Object origin, String pivot, QueryShardContext
public Relation isFieldWithinQuery(IndexReader reader,
Object from, Object to, boolean includeLower, boolean includeUpper,
ZoneId timeZone, DateMathParser dateParser, QueryRewriteContext context) throws IOException {
if (PointValues.size(reader, name()) == 0) {
// no points, so nothing matches
return Relation.DISJOINT;
}

long minValue = LongPoint.decodeDimension(PointValues.getMinPackedValue(reader, name()), 0);
long maxValue = LongPoint.decodeDimension(PointValues.getMaxPackedValue(reader, name()), 0);

return isFieldWithinQuery(minValue, maxValue, from, to, includeLower, includeUpper, timeZone, dateParser, context);
}

public Relation isFieldWithinQuery(long minValue, long maxValue,
Object from, Object to, boolean includeLower, boolean includeUpper,
ZoneId timeZone, DateMathParser dateParser, QueryRewriteContext context) throws IOException {
if (dateParser == null) {
if (from instanceof Number || to instanceof Number) {
// force epoch_millis
Expand Down Expand Up @@ -518,14 +532,6 @@ public Relation isFieldWithinQuery(IndexReader reader,
}
}

if (PointValues.size(reader, name()) == 0) {
// no points, so nothing matches
return Relation.DISJOINT;
}

long minValue = LongPoint.decodeDimension(PointValues.getMinPackedValue(reader, name()), 0);
long maxValue = LongPoint.decodeDimension(PointValues.getMaxPackedValue(reader, name()), 0);

if (minValue >= fromInclusive && maxValue <= toInclusive) {
return Relation.WITHIN;
} else if (maxValue < fromInclusive || minValue > toInclusive) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.query;

import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.shard.IndexLongFieldRange;

import java.util.function.LongSupplier;

/**
* Context object used to rewrite {@link QueryBuilder} instances into simplified version in the coordinator.
* Instances of this object rely on information stored in the {@code IndexMetadata} for certain indices.
* Right now this context object is able to rewrite range queries that include a known timestamp field
* (i.e. the timestamp field for DataStreams) into a MatchNoneQueryBuilder and skip the shards that
* don't hold queried data. See IndexMetadata#getTimestampMillisRange() for more details
*/
public class CoordinatorRewriteContext extends QueryRewriteContext {
private final Index index;
private IndexLongFieldRange indexLongFieldRange;
private final DateFieldMapper.DateFieldType timestampFieldType;

public CoordinatorRewriteContext(NamedXContentRegistry xContentRegistry,
NamedWriteableRegistry writeableRegistry,
Client client,
LongSupplier nowInMillis,
Index index,
IndexLongFieldRange indexLongFieldRange,
DateFieldMapper.DateFieldType timestampFieldType) {
super(xContentRegistry, writeableRegistry, client, nowInMillis);
this.index = index;
this.indexLongFieldRange = indexLongFieldRange;
this.timestampFieldType = timestampFieldType;
}

long getMinTimestamp() {
return indexLongFieldRange.getMin();
}

long getMaxTimestamp() {
return indexLongFieldRange.getMax();
}

boolean hasTimestampData() {
return indexLongFieldRange.isComplete() && indexLongFieldRange != IndexLongFieldRange.EMPTY;
}

@Nullable
public MappedFieldType getFieldType(String fieldName) {
if (fieldName.equals(timestampFieldType.name()) == false) {
return null;
}

return timestampFieldType;
}

@Override
public CoordinatorRewriteContext convertToCoordinatorRewriteContext() {
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.query;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.shard.IndexLongFieldRange;

import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class CoordinatorRewriteContextProvider {
private final NamedXContentRegistry xContentRegistry;
private final NamedWriteableRegistry writeableRegistry;
private final Client client;
private final LongSupplier nowInMillis;
private final Supplier<ClusterState> clusterStateSupplier;
private final Function<Index, DateFieldMapper.DateFieldType> mappingSupplier;

public CoordinatorRewriteContextProvider(NamedXContentRegistry xContentRegistry,
NamedWriteableRegistry writeableRegistry,
Client client,
LongSupplier nowInMillis,
Supplier<ClusterState> clusterStateSupplier,
Function<Index, DateFieldMapper.DateFieldType> mappingSupplier) {
this.xContentRegistry = xContentRegistry;
this.writeableRegistry = writeableRegistry;
this.client = client;
this.nowInMillis = nowInMillis;
this.clusterStateSupplier = clusterStateSupplier;
this.mappingSupplier = mappingSupplier;
}

@Nullable
public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {
ClusterState clusterState = clusterStateSupplier.get();
IndexMetadata indexMetadata = clusterState.metadata().index(index);

if (indexMetadata == null || indexMetadata.getTimestampMillisRange().containsAllShardRanges() == false) {
return null;
}

DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);

if (dateFieldType == null) {
return null;
}

IndexLongFieldRange timestampMillisRange = indexMetadata.getTimestampMillisRange();
return new CoordinatorRewriteContext(xContentRegistry,
writeableRegistry,
client,
nowInMillis,
index,
timestampMillisRange,
dateFieldType
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public QueryShardContext convertToShardContext() {
return null;
}

public CoordinatorRewriteContext convertToCoordinatorRewriteContext() {
return null;
}

/**
* Registers an async action that must be executed before the next rewrite round in order to make progress.
* This should be used if a rewriteabel needs to fetch some external resources in order to be executed ie. a document
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.time.DateMathParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;

Expand Down Expand Up @@ -428,6 +429,22 @@ public String getWriteableName() {

// Overridable for testing only
protected MappedFieldType.Relation getRelation(QueryRewriteContext queryRewriteContext) throws IOException {
CoordinatorRewriteContext coordinatorRewriteContext = queryRewriteContext.convertToCoordinatorRewriteContext();
if (coordinatorRewriteContext != null) {
final MappedFieldType fieldType = coordinatorRewriteContext.getFieldType(fieldName);
if (fieldType instanceof DateFieldMapper.DateFieldType) {
final DateFieldMapper.DateFieldType dateFieldType = (DateFieldMapper.DateFieldType) fieldType;
if (coordinatorRewriteContext.hasTimestampData() == false) {
return MappedFieldType.Relation.DISJOINT;
}
long minTimestamp = coordinatorRewriteContext.getMinTimestamp();
long maxTimestamp = coordinatorRewriteContext.getMaxTimestamp();
DateMathParser dateMathParser = getForceDateParser();
return dateFieldType.isFieldWithinQuery(minTimestamp, maxTimestamp, from, to, includeLower,
includeUpper, timeZone, dateMathParser, queryRewriteContext);
}
}

QueryShardContext shardContext = queryRewriteContext.convertToShardContext();
if (shardContext != null) {
final MappedFieldType fieldType = shardContext.getFieldType(fieldName);
Expand Down
Loading

0 comments on commit 32200b1

Please sign in to comment.