Skip to content

Commit

Permalink
Use IndexShard from reader context (#57384)
Browse files Browse the repository at this point in the history
Previously, we resolve IndexService and IndexShard once when creating a 
search context. However, since #51282, we resolve them in every phase as
we recreate a search context in each phase. This change can cause the
dfs_query or fetch phase to fail with ShardNotFoundException while
previously did not. This behavior is quite subtle, but I think we should
make it in line with the master branch.
  • Loading branch information
dnhatn authored Jun 4, 2020
1 parent 2d9a281 commit 0d0d647
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ final class DefaultSearchContext extends SearchContext {
ShardSearchRequest request,
SearchShardTarget shardTarget,
ClusterService clusterService,
IndexService indexService,
IndexShard indexShard,
BigArrays bigArrays,
LongSupplier relativeTimeSupplier,
TimeValue timeout,
Expand All @@ -169,8 +167,8 @@ final class DefaultSearchContext extends SearchContext {
this.dfsResult = new DfsSearchResult(readerContext.id(), shardTarget, request);
this.queryResult = new QuerySearchResult(readerContext.id(), shardTarget, request);
this.fetchResult = new FetchSearchResult(readerContext.id(), shardTarget);
this.indexShard = indexShard;
this.indexService = indexService;
this.indexService = readerContext.indexService();
this.indexShard = readerContext.indexShard();
this.clusterService = clusterService;
this.engineSearcher = readerContext.acquireSearcher("search");
this.searcher = new ContextIndexSearcher(engineSearcher.getIndexReader(), engineSearcher.getSimilarity(),
Expand Down
45 changes: 27 additions & 18 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedSupplier;
Expand Down Expand Up @@ -320,8 +321,7 @@ protected void doClose() {

public void executeDfsPhase(ShardSearchRequest request, boolean keepStatesInContext,
SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard shard = indexService.getShard(request.shardId().id());
final IndexShard shard = getShard(request);
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest rewritten) {
Expand Down Expand Up @@ -372,8 +372,7 @@ public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInCo
SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
: "empty responses require more than one shard";
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard shard = indexService.getShard(request.shardId().id());
final IndexShard shard = getShard(request);
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest orig) {
Expand All @@ -388,8 +387,8 @@ public void onResponse(ShardSearchRequest orig) {
// entirely. Otherwise we fork the execution in the search thread pool.
ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig);
try (Engine.Searcher searcher = readerContext.acquireSearcher("can_match")) {
QueryShardContext context = indexService.newQueryShardContext(canMatchRequest.shardId().id(), searcher,
canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias());
QueryShardContext context = readerContext.indexService().newQueryShardContext(canMatchRequest.shardId().id(),
searcher, canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias());
Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true);
}
if (canRewriteToMatchNone(canMatchRequest.source())
Expand Down Expand Up @@ -433,6 +432,14 @@ public void onFailure(Exception exc) {
});
}

private IndexShard getShard(ShardSearchRequest request) {
if (request.readerId() != null) {
return findReaderContext(request.readerId()).indexShard();
} else {
return indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
}
}

private <T> void runAsync(IndexShard shard, CheckedSupplier<T, Exception> command, ActionListener<T> listener) {
Executor executor = getExecutor(shard);
executor.execute(() -> {
Expand Down Expand Up @@ -543,6 +550,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
readerContext.setRescoreDocIds(rescoreDocIds);
return searchContext.queryResult();
} catch (Exception e) {
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
logger.trace("Query phase failed", e);
processFailure(readerContext, e);
throw e;
Expand Down Expand Up @@ -580,6 +588,7 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime);
return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget());
} catch (Exception e) {
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
logger.trace("Fetch phase failed", e);
processFailure(readerContext, e);
throw e;
Expand Down Expand Up @@ -610,6 +619,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
}
return searchContext.fetchResult();
} catch (Exception e) {
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
logger.trace("Fetch phase failed", e);
processFailure(readerContext, e);
throw e;
Expand Down Expand Up @@ -649,10 +659,10 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard shard = indexService.getShard(request.shardId().id());
Engine.SearcherSupplier reader = shard.acquireSearcherSupplier();
return createAndPutReaderContext(request, shard, reader, keepStatesInContext);
return createAndPutReaderContext(request, indexService, shard, reader, keepStatesInContext);
}

final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexShard shard,
final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexService indexService, IndexShard shard,
Engine.SearcherSupplier reader, boolean keepStatesInContext) {
assert request.readerId() == null;
assert request.keepAlive() == null;
Expand All @@ -671,13 +681,13 @@ final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexS
final long keepAlive = getKeepAlive(request);
checkKeepAliveLimit(keepAlive);
if (keepStatesInContext || request.scroll() != null) {
readerContext = new LegacyReaderContext(idGenerator.incrementAndGet(), shard, reader, request, keepAlive);
readerContext = new LegacyReaderContext(idGenerator.incrementAndGet(), indexService, shard, reader, request, keepAlive);
if (request.scroll() != null) {
readerContext.addOnClose(decreaseScrollContexts);
decreaseScrollContexts = null;
}
} else {
readerContext = new ReaderContext(idGenerator.incrementAndGet(), shard, reader, keepAlive,
readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexService, shard, reader, keepAlive,
request.keepAlive() == null);
}
reader = null;
Expand Down Expand Up @@ -722,7 +732,7 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen
try {
searcherSupplier = shard.acquireSearcherSupplier();
readerContext = new ReaderContext(
idGenerator.incrementAndGet(), shard, searcherSupplier, keepAlive.millis(), false);
idGenerator.incrementAndGet(), indexService, shard, searcherSupplier, keepAlive.millis(), false);
final ReaderContext finalReaderContext = readerContext;
searcherSupplier = null; // transfer ownership to reader context
searchOperationListener.onNewReaderContext(readerContext);
Expand Down Expand Up @@ -773,9 +783,10 @@ final SearchContext createContext(ReaderContext readerContext,
}

public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().getId());
Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier();
try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexShard, reader, -1L, true)) {
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard indexShard = indexService.getShard(request.shardId().getId());
final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier();
try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexService, indexShard, reader, -1L, true)) {
DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout);
searchContext.addReleasable(readerContext.markAsUsed());
return searchContext;
Expand All @@ -787,11 +798,9 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
boolean success = false;
DefaultSearchContext searchContext = null;
try {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
searchContext = new DefaultSearchContext(reader, request, shardTarget, clusterService, indexService, indexShard,
reader.indexShard().shardId(), request.getClusterAlias(), OriginalIndices.NONE);
searchContext = new DefaultSearchContext(reader, request, shardTarget, clusterService,
bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase, lowLevelCancellation);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.search.RescoreDocIds;
Expand All @@ -37,9 +38,9 @@ public class LegacyReaderContext extends ReaderContext {
private Engine.Searcher searcher;
private Releasable onClose;

public LegacyReaderContext(long id, IndexShard indexShard, Engine.SearcherSupplier reader,
public LegacyReaderContext(long id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier reader,
ShardSearchRequest shardSearchRequest, long keepAliveInMillis) {
super(id, indexShard, reader, keepAliveInMillis, false);
super(id, indexService, indexShard, reader, keepAliveInMillis, false);
assert shardSearchRequest.readerId() == null;
assert shardSearchRequest.keepAlive() == null;
this.shardSearchRequest = Objects.requireNonNull(shardSearchRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.search.RescoreDocIds;
Expand All @@ -45,6 +46,7 @@
*/
public class ReaderContext implements Releasable {
private final SearchContextId id;
private final IndexService indexService;
private final IndexShard indexShard;
protected final Engine.SearcherSupplier searcherSupplier;
private final AtomicBoolean closed = new AtomicBoolean(false);
Expand All @@ -62,11 +64,13 @@ public class ReaderContext implements Releasable {
private Map<String, Object> context;

public ReaderContext(long id,
IndexService indexService,
IndexShard indexShard,
Engine.SearcherSupplier searcherSupplier,
long keepAliveInMillis,
boolean singleSession) {
this.id = new SearchContextId(UUIDs.base64UUID(), id);
this.indexService = indexService;
this.indexShard = indexShard;
this.searcherSupplier = searcherSupplier;
this.singleSession = singleSession;
Expand Down Expand Up @@ -103,6 +107,9 @@ public SearchContextId id() {
return id;
}

public IndexService indexService() {
return indexService;
}

public IndexShard indexShard() {
return indexShard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE);

ReaderContext readerWithoutScroll = new ReaderContext(
randomNonNegativeLong(), indexShard, engineReader, randomNonNegativeLong(), false);
randomNonNegativeLong(), indexService, indexShard, engineReader, randomNonNegativeLong(), false);
DefaultSearchContext contextWithoutScroll = new DefaultSearchContext(readerWithoutScroll, shardSearchRequest, target, null,
indexService, indexShard, bigArrays, null, timeout, null, false);
bigArrays, null, timeout, null, false);
contextWithoutScroll.from(300);
contextWithoutScroll.close();

Expand All @@ -154,9 +154,9 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
// resultWindow greater than maxResultWindow and scrollContext isn't null
when(shardSearchRequest.scroll()).thenReturn(new Scroll(TimeValue.timeValueMillis(randomInt(1000))));
ReaderContext readerContext = new LegacyReaderContext(
randomNonNegativeLong(), indexShard, engineReader, shardSearchRequest, randomNonNegativeLong());
DefaultSearchContext context1 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService,
indexShard, bigArrays, null, timeout, null, false);
randomNonNegativeLong(), indexService, indexShard, engineReader, shardSearchRequest, randomNonNegativeLong());
DefaultSearchContext context1 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null,
bigArrays, null, timeout, null, false);
context1.from(300);
exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false));
assertThat(exception.getMessage(), equalTo("Batch size is too large, size must be less than or equal to: ["
Expand Down Expand Up @@ -186,10 +186,11 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
+ "] index level setting."));

readerContext.close();
readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineReader, randomNonNegativeLong(), false);
readerContext = new ReaderContext(
randomNonNegativeLong(), indexService, indexShard, engineReader, randomNonNegativeLong(), false);
// rescore is null but sliceBuilder is not null
DefaultSearchContext context2 = new DefaultSearchContext(readerContext, shardSearchRequest, target,
null, indexService, indexShard, bigArrays, null, timeout, null, false);
null, bigArrays, null, timeout, null, false);

SliceBuilder sliceBuilder = mock(SliceBuilder.class);
int numSlices = maxSlicesPerScroll + randomIntBetween(1, 100);
Expand All @@ -206,7 +207,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST);

DefaultSearchContext context3 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null,
indexService, indexShard, bigArrays, null, timeout, null, false);
bigArrays, null, timeout, null, false);
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query()));
Expand All @@ -216,9 +217,10 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
when(shardSearchRequest.indexRoutings()).thenReturn(new String[0]);

readerContext.close();
readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineReader, randomNonNegativeLong(), false);
DefaultSearchContext context4 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null,
indexService, indexShard, bigArrays, null, timeout, null, false);
readerContext =
new ReaderContext(randomNonNegativeLong(), indexService, indexShard, engineReader, randomNonNegativeLong(), false);
DefaultSearchContext context4 =
new DefaultSearchContext(readerContext, shardSearchRequest, target, null, bigArrays, null, timeout, null, false);
context4.sliceBuilder(new SliceBuilder(1,2)).parsedQuery(parsedQuery).preProcess(false);
Query query1 = context4.query();
context4.sliceBuilder(new SliceBuilder(0,2)).parsedQuery(parsedQuery).preProcess(false);
Expand Down
Loading

0 comments on commit 0d0d647

Please sign in to comment.