Skip to content

Commit

Permalink
Node level can match action (#78765) (#79344)
Browse files Browse the repository at this point in the history
Changes can-match from a shard-level to a node-level action, which helps avoid an explosion of shard-level can-match
subrequests in clusters with many shards, that can cause stability issues. Also introduces a new search_coordination
thread pool to handle the sending and handling of node-level can-match requests.
  • Loading branch information
ywelsch authored Oct 18, 2021
1 parent bf3fcbb commit 88ed45c
Show file tree
Hide file tree
Showing 20 changed files with 1,172 additions and 432 deletions.
5 changes: 5 additions & 0 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ There are several thread pools, but the important ones include:
Thread pool type is `fixed_auto_queue_size` with a size of `1`, and initial
queue_size of `100`.

`search_coordination`::
For lightweight search-related coordination operations. Thread pool type is
`fixed` with a size of a max of `min(5, (`<<node.processors,
`# of allocated processors`>>`) / 2)`, and queue_size of `1000`.

`get`::
For get operations. Thread pool type is `fixed`
with a size of <<node.processors, `# of allocated processors`>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static int indexDocs(RestHighLevelClient client, String index, int numDocs) thro
return numDocs;
}

void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs) {
void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs, Integer preFilterShardSize) {
try (RestHighLevelClient localClient = newLocalClient(LOGGER)) {
Request request = new Request("POST", "/_search");
final int expectedDocs;
Expand All @@ -103,6 +103,12 @@ void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int r
if (UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_0_0)) {
request.addParameter("ccs_minimize_roundtrips", Boolean.toString(randomBoolean()));
}
if (preFilterShardSize == null && randomBoolean()) {
preFilterShardSize = randomIntBetween(1, 100);
}
if (preFilterShardSize != null) {
request.addParameter("pre_filter_shard_size", Integer.toString(preFilterShardSize));
}
int size = between(1, 100);
request.setJsonEntity("{\"sort\": \"f\", \"size\": " + size + "}");
Response response = localClient.getLowLevelClient().performRequest(request);
Expand Down Expand Up @@ -142,7 +148,32 @@ public void testBWCSearchStates() throws Exception {
configureRemoteClusters(remoteNodes, CLUSTER_ALIAS, UPGRADE_FROM_VERSION, LOGGER);
int iterations = between(1, 20);
for (int i = 0; i < iterations; i++) {
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs);
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs, null);
}
localClient.indices().delete(new DeleteIndexRequest(localIndex), RequestOptions.DEFAULT);
remoteClient.indices().delete(new DeleteIndexRequest(remoteIndex), RequestOptions.DEFAULT);
}
}

public void testCanMatch() throws Exception {
String localIndex = "test_can_match_local_index";
String remoteIndex = "test_can_match_remote_index";
try (RestHighLevelClient localClient = newLocalClient(LOGGER);
RestHighLevelClient remoteClient = newRemoteClient()) {
localClient.indices().create(new CreateIndexRequest(localIndex)
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20))),
RequestOptions.DEFAULT);
int localNumDocs = indexDocs(localClient, localIndex, between(10, 100));

remoteClient.indices().create(new CreateIndexRequest(remoteIndex)
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20))),
RequestOptions.DEFAULT);
int remoteNumDocs = indexDocs(remoteClient, remoteIndex, between(10, 100));

configureRemoteClusters(getNodes(remoteClient.getLowLevelClient()), CLUSTER_ALIAS, UPGRADE_FROM_VERSION, LOGGER);
int iterations = between(1, 10);
for (int i = 0; i < iterations; i++) {
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs, between(1, 10));
}
localClient.indices().delete(new DeleteIndexRequest(localIndex), RequestOptions.DEFAULT);
remoteClient.indices().delete(new DeleteIndexRequest(remoteIndex), RequestOptions.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,11 @@ private static void assumeMultiClusterSetup() {
private static SearchRequest initSearchRequest() {
List<String> indices = Arrays.asList(INDEX_NAME, "my_remote_cluster:" + INDEX_NAME);
Collections.shuffle(indices, random());
return new SearchRequest(indices.toArray(new String[0]));
final SearchRequest request = new SearchRequest(indices.toArray(new String[0]));
if (randomBoolean()) {
request.setPreFilterShardSize(between(1, 20));
}
return request;
}

private static void duelSearch(SearchRequest searchRequest, Consumer<SearchResponse> responseChecker) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchPhaseResult;
Expand Down Expand Up @@ -65,7 +63,6 @@
*/
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase implements SearchPhaseContext {
private static final float DEFAULT_INDEX_BOOST = 1.0f;
private static final long[] EMPTY_LONG_ARRAY = new long[0];
private final Logger logger;
private final SearchTransportService searchTransportService;
private final Executor executor;
Expand Down Expand Up @@ -736,21 +733,9 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
final Map<String, long[]> indexToWaitForCheckpoints = request.getWaitForCheckpoints();
final TimeValue waitForCheckpointsTimeout = request.getWaitForCheckpointsTimeout();
final long[] waitForCheckpoints = indexToWaitForCheckpoints.getOrDefault(shardIt.shardId().getIndex().getName(), EMPTY_LONG_ARRAY);

long waitForCheckpoint;
if (waitForCheckpoints.length == 0) {
waitForCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
} else {
assert waitForCheckpoints.length > shardIndex;
waitForCheckpoint = waitForCheckpoints[shardIndex];
}
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request,
shardIt.shardId(), shardIndex, getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(),
shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive(), waitForCheckpoint,
waitForCheckpointsTimeout);
shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
// than creating an empty response in the search thread pool.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.search;

import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Node-level request used during can-match phase
*/
public class CanMatchNodeRequest extends TransportRequest implements IndicesRequest {

private final SearchSourceBuilder source;
private final List<Shard> shards;
private final SearchType searchType;
private final String[] types;
private final Boolean requestCache;
private final boolean allowPartialSearchResults;
private final Scroll scroll;
private final int numberOfShards;
private final long nowInMillis;
@Nullable
private final String clusterAlias;
private final String[] indices;
private final IndicesOptions indicesOptions;
private final TimeValue waitForCheckpointsTimeout;

public static class Shard implements Writeable {
private final String[] indices;
private final ShardId shardId;
private final int shardRequestIndex;
private final AliasFilter aliasFilter;
private final float indexBoost;
private final ShardSearchContextId readerId;
private final TimeValue keepAlive;
private final long waitForCheckpoint;

public Shard(String[] indices,
ShardId shardId,
int shardRequestIndex,
AliasFilter aliasFilter,
float indexBoost,
ShardSearchContextId readerId,
TimeValue keepAlive,
long waitForCheckpoint) {
this.indices = indices;
this.shardId = shardId;
this.shardRequestIndex = shardRequestIndex;
this.aliasFilter = aliasFilter;
this.indexBoost = indexBoost;
this.readerId = readerId;
this.keepAlive = keepAlive;
this.waitForCheckpoint = waitForCheckpoint;
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
}

public Shard(StreamInput in) throws IOException {
indices = in.readStringArray();
shardId = new ShardId(in);
shardRequestIndex = in.readVInt();
aliasFilter = new AliasFilter(in);
indexBoost = in.readFloat();
readerId = in.readOptionalWriteable(ShardSearchContextId::new);
keepAlive = in.readOptionalTimeValue();
waitForCheckpoint = in.readLong();
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
shardId.writeTo(out);
out.writeVInt(shardRequestIndex);
aliasFilter.writeTo(out);
out.writeFloat(indexBoost);
out.writeOptionalWriteable(readerId);
out.writeOptionalTimeValue(keepAlive);
out.writeLong(waitForCheckpoint);
}

public int getShardRequestIndex() {
return shardRequestIndex;
}

public String[] getOriginalIndices() {
return indices;
}

public ShardId shardId() {
return shardId;
}
}

public CanMatchNodeRequest(
SearchRequest searchRequest,
IndicesOptions indicesOptions,
List<Shard> shards,
int numberOfShards,
long nowInMillis,
@Nullable String clusterAlias
) {
this.source = searchRequest.source();
this.indicesOptions = indicesOptions;
this.shards = new ArrayList<>(shards);
this.searchType = searchRequest.searchType();
this.types = searchRequest.types();
this.requestCache = searchRequest.requestCache();
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
assert searchRequest.allowPartialSearchResults() != null;
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults();
this.scroll = searchRequest.scroll();
this.numberOfShards = numberOfShards;
this.nowInMillis = nowInMillis;
this.clusterAlias = clusterAlias;
this.waitForCheckpointsTimeout = searchRequest.getWaitForCheckpointsTimeout();
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct()
.toArray(String[]::new);
}

public CanMatchNodeRequest(StreamInput in) throws IOException {
super(in);
source = in.readOptionalWriteable(SearchSourceBuilder::new);
indicesOptions = IndicesOptions.readIndicesOptions(in);
searchType = SearchType.fromId(in.readByte());
types = in.readStringArray();
scroll = in.readOptionalWriteable(Scroll::new);
requestCache = in.readOptionalBoolean();
allowPartialSearchResults = in.readBoolean();
numberOfShards = in.readVInt();
nowInMillis = in.readVLong();
clusterAlias = in.readOptionalString();
waitForCheckpointsTimeout = in.readTimeValue();
shards = in.readList(Shard::new);
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct()
.toArray(String[]::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(source);
indicesOptions.writeIndicesOptions(out);
out.writeByte(searchType.id());
out.writeStringArray(types);
out.writeOptionalWriteable(scroll);
out.writeOptionalBoolean(requestCache);
out.writeBoolean(allowPartialSearchResults);
out.writeVInt(numberOfShards);
out.writeVLong(nowInMillis);
out.writeOptionalString(clusterAlias);
out.writeTimeValue(waitForCheckpointsTimeout);
out.writeList(shards);
}

public List<Shard> getShardLevelRequests() {
return shards;
}

public List<ShardSearchRequest> createShardSearchRequests() {
return shards.stream().map(this::createShardSearchRequest).collect(Collectors.toList());
}

public ShardSearchRequest createShardSearchRequest(Shard r) {
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(
new OriginalIndices(r.indices, indicesOptions), r.shardId, r.shardRequestIndex, numberOfShards, searchType,
source, types, requestCache, r.aliasFilter, r.indexBoost, allowPartialSearchResults, scroll,
nowInMillis, clusterAlias, r.readerId, r.keepAlive, r.waitForCheckpoint, waitForCheckpointsTimeout
);
shardSearchRequest.setParentTask(getParentTask());
return shardSearchRequest;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
public String getDescription() {
// Shard id is enough here, the request itself can be found by looking at the parent task description
return "shardIds[" + shards.stream().map(slr -> slr.shardId).collect(Collectors.toList()) + "]";
}

}
Loading

0 comments on commit 88ed45c

Please sign in to comment.