Skip to content

Commit

Permalink
Address reviews
Browse files Browse the repository at this point in the history
Rename remapShardId to shardRequestOrdinal.
Compute request ordinal per shard and number of request shard per index only once.
  • Loading branch information
jimczi committed Apr 19, 2018
1 parent b45f64e commit b90716c
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.transport.Transport;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -319,11 +318,9 @@ public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIter
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
int[] indexShards = getIndexShards(shardIt.shardId().getIndex());
int remapShardId = Arrays.binarySearch(indexShards, shardIt.shardId().getId());
assert remapShardId >= 0;
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), remapShardId,
indexShards.length, getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias);
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(),
getShardRequestOrdinal(shardIt.shardId()), getNumberOfRequestShards(shardIt.shardId().getIndex()),
getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
Expand All @@ -52,6 +53,8 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
private final SearchRequest request;
private final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
private final Map<ShardId, Integer> shardRequestOrdinals;
private final Map<String, Integer> numberOfRequestShardsPerIndex;
private final Logger logger;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
Expand All @@ -65,10 +68,18 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
this.request = request;
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
final List<SearchShardIterator> iterators = new ArrayList<>();
this.shardRequestOrdinals = new HashMap<>(iterators.size());
this.numberOfRequestShardsPerIndex = new HashMap<>();
Map<String, AtomicInteger> shardOrdMap = new HashMap<>();
for (final SearchShardIterator iterator : shardsIts) {
if (iterator.skip()) {
toSkipIterators.add(iterator);
} else {
ShardId shardId = iterator.shardId();
Index index = shardId.getIndex();
AtomicInteger shardOrd = shardOrdMap.computeIfAbsent(index.getUUID(), (a) -> new AtomicInteger(0));
shardRequestOrdinals.put(iterator.shardId(), shardOrd.getAndIncrement());
numberOfRequestShardsPerIndex.put(index.getUUID(), shardOrd.intValue());
iterators.add(iterator);
}
}
Expand Down Expand Up @@ -381,17 +392,18 @@ protected void skipShard(SearchShardIterator iterator) {
}

/**
* Returns the list of shard ids in the request that match the provided {@link Index}.
* Returns the shard request ordinal for the provided <code>shardId</code>.
*/
protected int[] getIndexShards(Index index) {
List<Integer> shards = new ArrayList<>();
for (ShardIterator it : shardsIts) {
if (index.equals(it.shardId().getIndex())) {
shards.add(it.shardId().getId());
}
}
Collections.sort(shards);
return shards.stream().mapToInt((i) -> i).toArray();
protected int getShardRequestOrdinal(ShardId shardId) {
return shardRequestOrdinals.get(shardId);
}

/**
* Return the number of requested shards for the provided <code>index</code>.
*/
protected int getNumberOfRequestShards(Index index) {
return numberOfRequestShardsPerIndex.get(index.getUUID());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ && new NestedHelper(mapperService()).mightMatchNestedDocs(query)
}

if (sliceBuilder != null) {
filters.add(sliceBuilder.toFilter(queryShardContext, remapShardId(), numberOfIndexShards(), minNodeVersion));
filters.add(sliceBuilder.toFilter(queryShardContext, shardRequestOrdinal(), numberOfIndexShards(), minNodeVersion));
}

if (filters.isEmpty()) {
Expand Down Expand Up @@ -341,8 +341,8 @@ public int numberOfIndexShards() {
return request.numberOfIndexShards();
}

public int remapShardId() {
return request.remapShardId();
public int shardRequestOrdinal() {
return request.shardRequestOrdinal();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -64,7 +61,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {

private String clusterAlias;
private ShardId shardId;
private int remapShardId;
private int shardRequestOrdinal;
private int numberOfIndexShards;
private int numberOfShards;
private SearchType searchType;
Expand All @@ -82,9 +79,9 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
ShardSearchLocalRequest() {
}

ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int remapShardId, int numberOfIndexShards, int numberOfShards,
ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int shardRequestOrdinal, int numberOfIndexShards, int numberOfShards,
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
this(shardId, remapShardId, numberOfIndexShards, numberOfShards, searchRequest.searchType(),
this(shardId, shardRequestOrdinal, numberOfIndexShards, numberOfShards, searchRequest.searchType(),
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
searchRequest.allowPartialSearchResults());
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
Expand All @@ -103,11 +100,11 @@ public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis
indexBoost = 1.0f;
}

public ShardSearchLocalRequest(ShardId shardId, int remapShardId, int numberOfIndexShards, int numberOfShards,
public ShardSearchLocalRequest(ShardId shardId, int shardRequestOrdinal, int numberOfIndexShards, int numberOfShards,
SearchType searchType, SearchSourceBuilder source, String[] types,
Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults) {
this.shardId = shardId;
this.remapShardId = remapShardId;
this.shardRequestOrdinal = shardRequestOrdinal;
this.numberOfIndexShards = numberOfIndexShards;
this.numberOfShards = numberOfShards;
this.searchType = searchType;
Expand Down Expand Up @@ -160,8 +157,8 @@ public int numberOfIndexShards() {
}

@Override
public int remapShardId() {
return remapShardId;
public int shardRequestOrdinal() {
return shardRequestOrdinal;
}

@Override
Expand Down Expand Up @@ -214,11 +211,11 @@ protected void innerReadFrom(StreamInput in) throws IOException {
searchType = SearchType.fromId(in.readByte());
numberOfShards = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
remapShardId = in.readVInt();
shardRequestOrdinal = in.readVInt();
numberOfIndexShards = in.readVInt();
assert remapShardId != -1 && numberOfIndexShards != -1;
assert shardRequestOrdinal != -1 && numberOfIndexShards != -1;
} else {
remapShardId = -1;
shardRequestOrdinal = -1;
numberOfIndexShards = -1;
}
scroll = in.readOptionalWriteable(Scroll::new);
Expand Down Expand Up @@ -255,7 +252,7 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException
if (!asKey) {
out.writeVInt(numberOfShards);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVInt(remapShardId);
out.writeVInt(shardRequestOrdinal);
out.writeVInt(numberOfIndexShards);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ public interface ShardSearchRequest {
ShardId shardId();

/**
* Returns the remapped shard id of the requested shard for this request
* Returns the shard request ordinal of the shard for this request
* or -1 if this information is not available.
* The remapped shard id is the id of the requested shard among all shards
* of this index that are part of the request. Note that the remapped shard id
* is equal to the original shard id if all shards of this index are part of the request.
* The request shard ordinal is the id of the requested shard among all shards
* of this index that are part of the request. Note that the request shard ordinal
* is equal to the shard id if all shards of the index are part of the request.
*/
int remapShardId();
int shardRequestOrdinal();

String[] types();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
Expand All @@ -56,10 +53,10 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
public ShardSearchTransportRequest(){
}

public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int remapShardId,
public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int shardRequestOrdinal,
int numberOfIndexShards, int numberOfShards, AliasFilter aliasFilter, float indexBoost,
long nowInMillis, String clusterAlias) {
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, remapShardId,
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, shardRequestOrdinal,
numberOfIndexShards, numberOfShards, aliasFilter, indexBoost,
nowInMillis, clusterAlias);
this.originalIndices = originalIndices;
Expand Down Expand Up @@ -105,8 +102,8 @@ public ShardId shardId() {
}

@Override
public int remapShardId() {
return shardSearchLocalRequest.remapShardId();
public int shardRequestOrdinal() {
return shardSearchLocalRequest.shardRequestOrdinal();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,22 @@ public int hashCode() {
* Converts this QueryBuilder to a lucene {@link Query}.
*
* @param context Additional information needed to build the query
* @param remapShardId The shardId of this shard for this request
* or -1 if this information is not available.
* @param shardRequestOrdinal The shard ordinal of for this request or -1 if this information is not available.
* @param numIndexShards The number of shards of this index that participates in the request
* or -1 if this information is not available.
* @param minNodeVersion The version of the node with the youngest version in the cluster.
*/
public Query toFilter(QueryShardContext context, int remapShardId, int numIndexShards, Version minNodeVersion) {
public Query toFilter(QueryShardContext context, int shardRequestOrdinal, int numIndexShards, Version minNodeVersion) {
final int numShards;
final int shardId;
if (remapShardId != -1 && minNodeVersion.onOrAfter(Version.V_7_0_0_alpha1)) {
if (shardRequestOrdinal != -1 && minNodeVersion.onOrAfter(Version.V_7_0_0_alpha1)) {
/**
* We use the remapped shard id (added in {@link Version#V_7_0_0_alpha1} only if all nodes
* We use the request shard ordinal (added in {@link Version#V_7_0_0_alpha1} only if all nodes
* are able to pass this information otherwise another slice might use the original shard
* id and that would lead to duplicated results.
*/
assert numIndexShards != -1;
shardId = remapShardId;
shardId = shardRequestOrdinal;
numShards = numIndexShards;
} else {
shardId = context.getShardId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void testBuildShardSearchTransportRequest() {
assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices());
assertEquals(new MatchAllQueryBuilder(), shardSearchTransportRequest.getAliasFilter().getQueryBuilder());
assertEquals(2.0f, shardSearchTransportRequest.indexBoost(), 0.0f);
assertEquals(0, shardSearchTransportRequest.remapShardId());
assertEquals(0, shardSearchTransportRequest.shardRequestOrdinal());
assertEquals(1, shardSearchTransportRequest.numberOfIndexShards());
assertEquals(1, shardSearchTransportRequest.numberOfShards());
}
Expand All @@ -149,7 +149,7 @@ public void testBuildFilteredShardSearchTransportRequest() {
int numIndex = randomIntBetween(1, 5);
List<SearchShardIterator> shards = new ArrayList<>();
int[] numIndexShards = new int[numIndex];
Map<Integer, Map<Integer, Integer>> remapShards = new HashMap<>();
Map<Integer, Map<Integer, Integer>> shardOrdinals = new HashMap<>();
int totalShards = 0;
for (int i = 0; i < numIndex; i++) {
int numShards = randomIntBetween(1, 10);
Expand All @@ -165,7 +165,7 @@ public void testBuildFilteredShardSearchTransportRequest() {
totalShards++;
}
}
remapShards.put(i, shardMap);
shardOrdinals.put(i, shardMap);
}
Collections.shuffle(shards, random());
GroupShardsIterator<SearchShardIterator> shardsIt = new GroupShardsIterator<>(shards);
Expand All @@ -174,7 +174,7 @@ public void testBuildFilteredShardSearchTransportRequest() {
for (int i = 0; i < numIndex; i++) {
int numShards = numIndexShards[i];
String indexName = Integer.toString(i);
Map<Integer, Integer> shardMap = remapShards.get(i);
Map<Integer, Integer> shardMap = shardOrdinals.get(i);
if (shardMap.size() == 0) {
continue;
}
Expand All @@ -186,7 +186,7 @@ public void testBuildFilteredShardSearchTransportRequest() {
Collections.emptyList(), new OriginalIndices(new String[]{"name", "name1"}, IndicesOptions.strictExpand()));
ShardSearchTransportRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator);
assertThat(shardSearchTransportRequest.numberOfIndexShards(), equalTo(shardMap.size()));
assertThat(shardSearchTransportRequest.remapShardId(), equalTo(shardMap.get(j)));
assertThat(shardSearchTransportRequest.shardRequestOrdinal(), equalTo(shardMap.get(j)));
assertThat(shardSearchTransportRequest.numberOfShards(), equalTo(totalShards));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public ShardId shardId() {
}

@Override
public int remapShardId() {
public int shardRequestOrdinal() {
return 0;
}

Expand Down

0 comments on commit b90716c

Please sign in to comment.