Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node level can match action #78765

Merged
merged 38 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
76b467a
WIP
ywelsch Sep 30, 2021
07e9f51
happy path
ywelsch Sep 30, 2021
53d1071
more work
ywelsch Oct 4, 2021
2263bbd
more edits
ywelsch Oct 4, 2021
66d2095
more stuff
ywelsch Oct 5, 2021
4a8bf13
CCS multi-version test passing
ywelsch Oct 5, 2021
a69bcbc
unit tests
ywelsch Oct 6, 2021
fe56d5a
simpler diff?
ywelsch Oct 6, 2021
75e2bb5
more renaming
ywelsch Oct 6, 2021
3e1d6e1
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch Oct 6, 2021
44b5783
single list
ywelsch Oct 6, 2021
a65bd63
reset prefiltter settings
ywelsch Oct 6, 2021
41b61bd
fix serialzation
ywelsch Oct 6, 2021
dd93c93
thread pool
ywelsch Oct 7, 2021
1a718cf
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch Oct 7, 2021
36bb4a1
javadoc
ywelsch Oct 11, 2021
9bf0259
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch Oct 11, 2021
da5707f
merge conflict
ywelsch Oct 11, 2021
ea79ae2
docs
ywelsch Oct 12, 2021
9c3196f
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch Oct 12, 2021
5a81c07
remove dead code
ywelsch Oct 14, 2021
e26160f
avoid cast
ywelsch Oct 14, 2021
02c7193
simpler and rename
ywelsch Oct 14, 2021
ca49aa8
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch Oct 14, 2021
9670258
checkstyle
ywelsch Oct 14, 2021
803068b
cosmetics
ywelsch Oct 14, 2021
a3457ef
original indices
ywelsch Oct 14, 2021
d032fef
Add can match qa test
dnhatn Oct 14, 2021
24baa8b
implement indicesrequest
ywelsch Oct 14, 2021
171e530
Merge branch 'node-level-can-match' of github.com:ywelsch/elasticsear…
ywelsch Oct 14, 2021
259f049
merge originalindices
ywelsch Oct 14, 2021
16f45ed
better
ywelsch Oct 14, 2021
c0e35ee
radnomize use of can-match phase
ywelsch Oct 15, 2021
bf6b194
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch Oct 15, 2021
ade3b7b
Merge branch 'master' into node-level-can-match
elasticmachine Oct 15, 2021
cc83808
rename
ywelsch Oct 18, 2021
aada13a
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch Oct 18, 2021
c1cc951
Merge branch 'node-level-can-match' of github.com:ywelsch/elasticsear…
ywelsch Oct 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchPhaseResult;
Expand Down Expand Up @@ -65,7 +63,6 @@
*/
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase implements SearchPhaseContext {
private static final float DEFAULT_INDEX_BOOST = 1.0f;
private static final long[] EMPTY_LONG_ARRAY = new long[0];
private final Logger logger;
private final SearchTransportService searchTransportService;
private final Executor executor;
Expand Down Expand Up @@ -736,21 +733,9 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
final Map<String, long[]> indexToWaitForCheckpoints = request.getWaitForCheckpoints();
final TimeValue waitForCheckpointsTimeout = request.getWaitForCheckpointsTimeout();
final long[] waitForCheckpoints = indexToWaitForCheckpoints.getOrDefault(shardIt.shardId().getIndex().getName(), EMPTY_LONG_ARRAY);

long waitForCheckpoint;
if (waitForCheckpoints.length == 0) {
waitForCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
} else {
assert waitForCheckpoints.length > shardIndex;
waitForCheckpoint = waitForCheckpoints[shardIndex];
}
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request,
shardIt.shardId(), shardIndex, getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(),
shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive(), waitForCheckpoint,
waitForCheckpointsTimeout);
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
// than creating an empty response in the search thread pool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,10 @@ public CanMatchRequest.Shard buildShardLevelRequest(SearchShardIterator shardIt)
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
int shardRequestIndex = shardItIndexMap.get(shardIt);
return new CanMatchRequest.Shard(shardIt.getOriginalIndices().indices(), shardIt.shardId(),
shardItIndexMap.get(shardIt), filter, indexBoost, shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
shardRequestIndex, filter, indexBoost, shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive(),
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex));
}

private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> shardsIts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class CanMatchRequest extends TransportRequest implements IndicesRequest
private final String clusterAlias;
private final String[] indices;
private final IndicesOptions indicesOptions;
private final TimeValue waitForCheckpointsTimeout;

public static class Shard implements Writeable {
private final String[] indices;
Expand All @@ -59,21 +60,24 @@ public static class Shard implements Writeable {
private final float indexBoost;
private final ShardSearchContextId readerId;
private final TimeValue keepAlive;
private final long waitForCheckpoint;

public Shard(String[] indices,
ShardId shardId,
int shardRequestIndex,
AliasFilter aliasFilter,
float indexBoost,
ShardSearchContextId readerId,
TimeValue keepAlive) {
TimeValue keepAlive,
long waitForCheckpoint) {
this.indices = indices;
this.shardId = shardId;
this.shardRequestIndex = shardRequestIndex;
this.aliasFilter = aliasFilter;
this.indexBoost = indexBoost;
this.readerId = readerId;
this.keepAlive = keepAlive;
this.waitForCheckpoint = waitForCheckpoint;
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
}

Expand All @@ -85,6 +89,7 @@ public Shard(StreamInput in) throws IOException {
indexBoost = in.readFloat();
readerId = in.readOptionalWriteable(ShardSearchContextId::new);
keepAlive = in.readOptionalTimeValue();
waitForCheckpoint = in.readLong();
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
}

Expand All @@ -97,6 +102,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeFloat(indexBoost);
out.writeOptionalWriteable(readerId);
out.writeOptionalTimeValue(keepAlive);
out.writeLong(waitForCheckpoint);
}

public int getShardRequestIndex() {
Expand Down Expand Up @@ -133,6 +139,7 @@ public CanMatchRequest(
this.numberOfShards = numberOfShards;
this.nowInMillis = nowInMillis;
this.clusterAlias = clusterAlias;
this.waitForCheckpointsTimeout = searchRequest.getWaitForCheckpointsTimeout();
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct()
.toArray(String[]::new);
}
Expand All @@ -148,6 +155,7 @@ public CanMatchRequest(StreamInput in) throws IOException {
numberOfShards = in.readVInt();
nowInMillis = in.readVLong();
clusterAlias = in.readOptionalString();
waitForCheckpointsTimeout = in.readTimeValue();
shards = in.readList(Shard::new);
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct()
.toArray(String[]::new);
Expand All @@ -165,6 +173,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numberOfShards);
out.writeVLong(nowInMillis);
out.writeOptionalString(clusterAlias);
out.writeTimeValue(waitForCheckpointsTimeout);
out.writeList(shards);
}

Expand All @@ -180,7 +189,7 @@ public ShardSearchRequest createShardSearchRequest(Shard r) {
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(
new OriginalIndices(r.indices, indicesOptions), r.shardId, r.shardRequestIndex, numberOfShards, searchType,
source, requestCache, r.aliasFilter, r.indexBoost, allowPartialSearchResults, scroll,
nowInMillis, clusterAlias, r.readerId, r.keepAlive
nowInMillis, clusterAlias, r.readerId, r.keepAlive, r.waitForCheckpoint, waitForCheckpointsTimeout
);
shardSearchRequest.setParentTask(getParentTask());
return shardSearchRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
public static final Setting<Long> SHARD_COUNT_LIMIT_SETTING = Setting.longSetting(
"action.search.shard_count.limit", Long.MAX_VALUE, 1L, Property.Dynamic, Property.NodeScope);

public static final Setting<Integer> DEFAULT_PRE_FILTER_SHARD_SIZE = Setting.intSetting(
"action.search.pre_filter_shard_size.default", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE, 1, Property.NodeScope);

private final ThreadPool threadPool;
private final ClusterService clusterService;
private final SearchTransportService searchTransportService;
Expand All @@ -115,6 +118,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final NamedWriteableRegistry namedWriteableRegistry;
private final CircuitBreaker circuitBreaker;
private final ExecutorSelector executorSelector;
private final int defaultPreFilterShardSize;

@Inject
public TransportSearchAction(ThreadPool threadPool,
Expand All @@ -140,6 +144,7 @@ public TransportSearchAction(ThreadPool threadPool,
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.namedWriteableRegistry = namedWriteableRegistry;
this.executorSelector = executorSelector;
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(clusterService.getSettings());
}

private Map<String, OriginalIndices> buildPerIndexOriginalIndices(ClusterState clusterState,
Expand Down Expand Up @@ -741,7 +746,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
nodes::get, remoteConnections, searchTransportService::getConnection);
final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices);
final boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteLocalIndices,
localShardIterators.size() + remoteShardIterators.size());
localShardIterators.size() + remoteShardIterators.size(), defaultPreFilterShardSize);
searchAsyncActionProvider.asyncSearchAction(
task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState,
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener,
Expand Down Expand Up @@ -788,14 +793,15 @@ static BiFunction<String, String, Transport.Connection> buildConnectionLookup(St
static boolean shouldPreFilterSearchShards(ClusterState clusterState,
SearchRequest searchRequest,
String[] indices,
int numShards) {
int numShards,
int defaultPreFilterShardSize) {
SearchSourceBuilder source = searchRequest.source();
Integer preFilterShardSize = searchRequest.getPreFilterShardSize();
if (preFilterShardSize == null
&& (hasReadOnlyIndices(indices, clusterState) || hasPrimaryFieldSort(source))) {
preFilterShardSize = 1;
} else if (preFilterShardSize == null) {
preFilterShardSize = SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE;
preFilterShardSize = defaultPreFilterShardSize;
}
return searchRequest.searchType() == QUERY_THEN_FETCH // we can't do this for DFS it needs to fan out to all shards all the time
&& (SearchService.canRewriteToMatchNone(source) || hasPrimaryFieldSort(source))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.search.CanMatchResponse;
import org.elasticsearch.action.search.CanMatchRequest;
import org.elasticsearch.action.search.CanMatchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.action.search.SearchType;
Expand All @@ -30,10 +31,6 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -28,14 +26,16 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchNoneQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.AliasFilterParsingException;
Expand Down Expand Up @@ -113,34 +113,6 @@ public ShardSearchRequest(OriginalIndices originalIndices,
@Nullable String clusterAlias,
ShardSearchContextId readerId,
TimeValue keepAlive) {
this(originalIndices,
searchRequest,
shardId,
shardRequestIndex,
numberOfShards,
aliasFilter,
indexBoost,
nowInMillis,
clusterAlias,
readerId,
keepAlive,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SearchService.NO_TIMEOUT);
}

public ShardSearchRequest(OriginalIndices originalIndices,
SearchRequest searchRequest,
ShardId shardId,
int shardRequestIndex,
int numberOfShards,
AliasFilter aliasFilter,
float indexBoost,
long nowInMillis,
@Nullable String clusterAlias,
ShardSearchContextId readerId,
TimeValue keepAlive,
long waitForCheckpoint,
TimeValue waitForCheckpointsTimeout) {
this(originalIndices,
shardId,
shardRequestIndex,
Expand All @@ -156,13 +128,28 @@ public ShardSearchRequest(OriginalIndices originalIndices,
clusterAlias,
readerId,
keepAlive,
waitForCheckpoint,
waitForCheckpointsTimeout);
computeWaitForCheckpoint(searchRequest.getWaitForCheckpoints(), shardId, shardRequestIndex),
searchRequest.getWaitForCheckpointsTimeout());
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
assert searchRequest.allowPartialSearchResults() != null;
}

private static final long[] EMPTY_LONG_ARRAY = new long[0];

public static long computeWaitForCheckpoint(Map<String, long[]> indexToWaitForCheckpoints, ShardId shardId, int shardRequestIndex) {
final long[] waitForCheckpoints = indexToWaitForCheckpoints.getOrDefault(shardId.getIndex().getName(), EMPTY_LONG_ARRAY);

long waitForCheckpoint;
if (waitForCheckpoints.length == 0) {
waitForCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
} else {
assert waitForCheckpoints.length > shardRequestIndex;
waitForCheckpoint = waitForCheckpoints[shardRequestIndex];
}
return waitForCheckpoint;
}

public ShardSearchRequest(ShardId shardId,
long nowInMillis,
AliasFilter aliasFilter) {
Expand Down
Loading