Skip to content

Commit

Permalink
Clarify SourceLookup sharing across fetch subphases. (#60484)
Browse files Browse the repository at this point in the history
The `SourceLookup` class provides access to the _source for a particular
document, specified through `SourceLookup#setSegmentAndDocument`. Previously
the search context contained a single `SourceLookup` that was shared between
different fetch subphases. It was hard to reason about its state: is
`SourceLookup` set to the expected document? Is the _source already loaded and
available?

Instead of using a global source lookup, the fetch hit context now provides
access to a lookup that is set to load from the hit document.

This refactor closes #31000, since the same `SourceLookup` is no longer shared
between the 'fetch _source phase' and script execution.
  • Loading branch information
jtibshirani authored Jul 30, 2020
1 parent da69644 commit dfd7f22
Show file tree
Hide file tree
Showing 19 changed files with 114 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,11 @@ public void hitsExecute(SearchContext context, SearchHit[] hits) throws IOExcept
SearchHighlightContext highlight = new SearchHighlightContext(context.highlight().fields(), true);
QueryShardContext shardContext = new QueryShardContext(context.getQueryShardContext());
shardContext.freezeContext();
shardContext.lookup().source().setSegmentAndDocument(percolatorLeafReaderContext, slot);
shardContext.lookup().source().setSource(document);
hitContext.reset(
new SearchHit(slot, "unknown", new Text(hit.getType()), Collections.emptyMap(), Collections.emptyMap()),
percolatorLeafReaderContext, slot, percolatorIndexSearcher
);
hitContext.sourceLookup().setSource(document);
hitContext.cache().clear();
highlightPhase.hitExecute(context.shardTarget(), shardContext, query, highlight, hitContext);
for (Map.Entry<String, HighlightField> entry : hitContext.hit().getHighlightFields().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotatedHighlighterAnalyzer;
import org.elasticsearch.index.mapper.annotatedtext.AnnotatedTextFieldMapper.AnnotatedText;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.fetch.FetchSubPhase.HitContext;
import org.elasticsearch.search.fetch.subphase.highlight.SearchHighlightContext.Field;

Expand All @@ -47,10 +46,9 @@ protected Analyzer getAnalyzer(DocumentMapper docMapper, HitContext hitContext)
@Override
protected List<Object> loadFieldValues(MappedFieldType fieldType,
Field field,
QueryShardContext context,
HitContext hitContext,
boolean forceSource) throws IOException {
List<Object> fieldValues = super.loadFieldValues(fieldType, field, context, hitContext, forceSource);
List<Object> fieldValues = super.loadFieldValues(fieldType, field, hitContext, forceSource);
String[] fieldValuesAsString = fieldValues.toArray(new String[fieldValues.size()]);

AnnotatedText[] annotations = new AnnotatedText[fieldValuesAsString.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected void setupInnerHitsContext(QueryShardContext queryShardContext,
for (SearchSourceBuilder.ScriptField field : innerHitBuilder.getScriptFields()) {
QueryShardContext innerContext = innerHitsContext.getQueryShardContext();
FieldScript.Factory factory = innerContext.compile(field.script(), FieldScript.CONTEXT);
FieldScript.LeafFactory fieldScript = factory.newFactory(field.script().getParams(), innerHitsContext.lookup());
FieldScript.LeafFactory fieldScript = factory.newFactory(field.script().getParams(), innerContext.lookup());
innerHitsContext.scriptFields().add(new org.elasticsearch.search.fetch.subphase.ScriptFieldsContext.ScriptField(
field.fieldName(), fieldScript, field.ignoreFailure()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.elasticsearch.search.internal.SearchContext.Lifetime;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchRequest;
Expand Down Expand Up @@ -943,7 +944,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
}
for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) {
FieldScript.Factory factory = scriptService.compile(field.script(), FieldScript.CONTEXT);
FieldScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), context.lookup());
SearchLookup lookup = context.getQueryShardContext().lookup();
FieldScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), lookup);
context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, field.ignoreFailure()));
}
}
Expand Down
103 changes: 63 additions & 40 deletions server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,18 @@ public void execute(SearchContext context) {
LeafReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
int subDocId = docId - subReaderContext.docBase;

final SearchHit searchHit;
int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);
if (rootDocId != -1) {
searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId,
prepareNestedHitContext(hitContext, context, docId, subDocId, rootDocId,
storedToRequestedFields, subReaderContext);
} else {
searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId,
prepareHitContext(hitContext, context, fieldsVisitor, docId, subDocId,
storedToRequestedFields, subReaderContext);
}

SearchHit searchHit = hitContext.hit();
sortedHits[index] = searchHit;
hits[docs[index].index] = searchHit;
hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher());
for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
fetchSubPhase.hitExecute(context, hitContext);
}
Expand Down Expand Up @@ -226,43 +225,64 @@ private int findRootDocumentIfNested(SearchContext context, LeafReaderContext su
return -1;
}

private SearchHit createSearchHit(SearchContext context,
FieldsVisitor fieldsVisitor,
int docId,
int subDocId,
Map<String, Set<String>> storedToRequestedFields,
LeafReaderContext subReaderContext) {
/**
* Resets the provided {@link FetchSubPhase.HitContext} with information on the current
* document. This includes the following:
* - Adding an initial {@link SearchHit} instance.
* - Loading the document source and setting it on {@link SourceLookup}. This allows
* fetch subphases that use the hit context to access the preloaded source.
*/
private void prepareHitContext(FetchSubPhase.HitContext hitContext,
SearchContext context,
FieldsVisitor fieldsVisitor,
int docId,
int subDocId,
Map<String, Set<String>> storedToRequestedFields,
LeafReaderContext subReaderContext) {
DocumentMapper documentMapper = context.mapperService().documentMapper();
Text typeText = documentMapper.typeText();

if (fieldsVisitor == null) {
return new SearchHit(docId, null, typeText, null, null);
}
loadStoredFields(context.shardTarget(), subReaderContext, fieldsVisitor, subDocId);
fieldsVisitor.postProcess(context.mapperService());
SearchHit searchHit;
if (fieldsVisitor.fields().isEmpty() == false) {
Map<String, DocumentField> docFields = new HashMap<>();
Map<String, DocumentField> metaFields = new HashMap<>();
fillDocAndMetaFields(context, fieldsVisitor, storedToRequestedFields, docFields, metaFields);
searchHit = new SearchHit(docId, fieldsVisitor.uid().id(), typeText, docFields, metaFields);
SearchHit hit = new SearchHit(docId, null, typeText, null, null);
hitContext.reset(hit, subReaderContext, subDocId, context.searcher());
} else {
searchHit = new SearchHit(docId, fieldsVisitor.uid().id(), typeText, emptyMap(), emptyMap());
}
// Set _source if requested.
SourceLookup sourceLookup = context.lookup().source();
sourceLookup.setSegmentAndDocument(subReaderContext, subDocId);
if (fieldsVisitor.source() != null) {
sourceLookup.setSource(fieldsVisitor.source());
SearchHit hit;
loadStoredFields(context.shardTarget(), subReaderContext, fieldsVisitor, subDocId);
fieldsVisitor.postProcess(context.mapperService());
Uid uid = fieldsVisitor.uid();
if (fieldsVisitor.fields().isEmpty() == false) {
Map<String, DocumentField> docFields = new HashMap<>();
Map<String, DocumentField> metaFields = new HashMap<>();
fillDocAndMetaFields(context, fieldsVisitor, storedToRequestedFields, docFields, metaFields);
hit = new SearchHit(docId, uid.id(), typeText, docFields, metaFields);
} else {
hit = new SearchHit(docId, uid.id(), typeText, emptyMap(), emptyMap());
}

hitContext.reset(hit, subReaderContext, subDocId, context.searcher());
if (fieldsVisitor.source() != null) {
hitContext.sourceLookup().setSource(fieldsVisitor.source());
}
}
return searchHit;
}

private SearchHit createNestedSearchHit(SearchContext context,
int nestedTopDocId,
int nestedSubDocId,
int rootSubDocId,
Map<String, Set<String>> storedToRequestedFields,
LeafReaderContext subReaderContext) throws IOException {
/**
/**
* Resets the provided {@link FetchSubPhase.HitContext} with information on the current
* nested document. This includes the following:
* - Adding an initial {@link SearchHit} instance.
* - Loading the document source, filtering it based on the nested document ID, then
* setting it on {@link SourceLookup}. This allows fetch subphases that use the hit
* context to access the preloaded source.
*/
@SuppressWarnings("unchecked")
private void prepareNestedHitContext(FetchSubPhase.HitContext hitContext,
SearchContext context,
int nestedTopDocId,
int nestedSubDocId,
int rootSubDocId,
Map<String, Set<String>> storedToRequestedFields,
LeafReaderContext subReaderContext) throws IOException {
// Also if highlighting is requested on nested documents we need to fetch the _source from the root document,
// otherwise highlighting will attempt to fetch the _source from the nested doc, which will fail,
// because the entire _source is only stored with the root document.
Expand Down Expand Up @@ -295,16 +315,19 @@ private SearchHit createNestedSearchHit(SearchContext context,
}

DocumentMapper documentMapper = context.mapperService().documentMapper();
SourceLookup sourceLookup = context.lookup().source();
sourceLookup.setSegmentAndDocument(subReaderContext, nestedSubDocId);
Text typeText = documentMapper.typeText();

ObjectMapper nestedObjectMapper = documentMapper.findNestedObjectMapper(nestedSubDocId, context, subReaderContext);
assert nestedObjectMapper != null;
SearchHit.NestedIdentity nestedIdentity =
getInternalNestedIdentity(context, nestedSubDocId, subReaderContext, context.mapperService(), nestedObjectMapper);

SearchHit hit = new SearchHit(nestedTopDocId, uid.id(), typeText, nestedIdentity, docFields, metaFields);
hitContext.reset(hit, subReaderContext, nestedSubDocId, context.searcher());

if (source != null) {
Tuple<XContentType, Map<String, Object>> tuple = XContentHelper.convertToMap(source, true);
XContentType contentType = tuple.v1();
Map<String, Object> sourceAsMap = tuple.v2();

// Isolate the nested json array object that matches with nested hit and wrap it back into the same json
Expand Down Expand Up @@ -347,11 +370,11 @@ private SearchHit createNestedSearchHit(SearchContext context,
current = next;
}
}
context.lookup().source().setSource(nestedSourceAsMap);
XContentType contentType = tuple.v1();
context.lookup().source().setSourceContentType(contentType);

hitContext.sourceLookup().setSource(nestedSourceAsMap);
hitContext.sourceLookup().setSourceContentType(contentType);
}
return new SearchHit(nestedTopDocId, uid.id(), documentMapper.typeText(), nestedIdentity, docFields, metaFields);

}

private SearchHit.NestedIdentity getInternalNestedIdentity(SearchContext context, int nestedSubDocId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceLookup;

import java.io.IOException;
import java.util.HashMap;
Expand All @@ -39,13 +40,15 @@ class HitContext {
private IndexSearcher searcher;
private LeafReaderContext readerContext;
private int docId;
private final SourceLookup sourceLookup = new SourceLookup();
private Map<String, Object> cache;

public void reset(SearchHit hit, LeafReaderContext context, int docId, IndexSearcher searcher) {
this.hit = hit;
this.readerContext = context;
this.docId = docId;
this.searcher = searcher;
this.sourceLookup.setSegmentAndDocument(context, docId);
}

public SearchHit hit() {
Expand All @@ -64,6 +67,17 @@ public int docId() {
return docId;
}

/**
* This lookup provides access to the source for the given hit document. Note
* that it should always be set to the correct doc ID and {@link LeafReaderContext}.
*
* In most cases, the hit document's source is loaded eagerly at the start of the
* {@link FetchPhase}. This lookup will contain the preloaded source.
*/
public SourceLookup sourceLookup() {
return sourceLookup;
}

public IndexReader topLevelReader() {
return searcher.getIndexReader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void hitExecute(SearchContext context, HitContext hitContext) {
}

SearchHit hit = hitContext.hit();
SourceLookup sourceLookup = context.lookup().source();
SourceLookup sourceLookup = hitContext.sourceLookup();
FieldValueRetriever fieldValueRetriever = fetchFieldsContext.fieldValueRetriever();

Set<String> ignoredFields = getIgnoredFields(hit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void hitExecute(SearchContext context, HitContext hitContext) {
return;
}
final boolean nestedHit = hitContext.hit().getNestedIdentity() != null;
SourceLookup source = context.lookup().source();
SourceLookup source = hitContext.sourceLookup();
FetchSourceContext fetchSourceContext = context.fetchSourceContext();
assert fetchSourceContext.fetchSource();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ public void hitsExecute(SearchContext context, SearchHit[] hits) throws IOExcept
}
innerHits.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
innerHits.setUid(new Uid(hit.getType(), hit.getId()));
innerHits.lookup().source().setSource(context.lookup().source().internalSourceRef());
if (context.lookup().source().source() != null) {
innerHits.lookup().source().setSource(context.lookup().source().source());
}

fetchPhase.execute(innerHits);
FetchSearchResult fetchResult = innerHits.fetchResult();
SearchHit[] internalHits = fetchResult.fetchResult().hits().getHits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.TextSearchInfo;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.fetch.FetchPhaseExecutionException;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.subphase.highlight.SearchHighlightContext.Field;
Expand Down Expand Up @@ -71,7 +70,6 @@ public FastVectorHighlighter(Settings settings) {
@Override
public HighlightField highlight(FieldHighlightContext fieldContext) {
SearchHighlightContext.Field field = fieldContext.field;
QueryShardContext context = fieldContext.context;
FetchSubPhase.HitContext hitContext = fieldContext.hitContext;
MappedFieldType fieldType = fieldContext.fieldType;
boolean forceSource = fieldContext.forceSource;
Expand Down Expand Up @@ -104,7 +102,7 @@ public HighlightField highlight(FieldHighlightContext fieldContext) {
fragmentsBuilder = new SimpleFragmentsBuilder(fieldType, field.fieldOptions().preTags(),
field.fieldOptions().postTags(), boundaryScanner);
} else {
fragmentsBuilder = new SourceSimpleFragmentsBuilder(fieldType, context,
fragmentsBuilder = new SourceSimpleFragmentsBuilder(fieldType, hitContext.sourceLookup(),
field.fieldOptions().preTags(), field.fieldOptions().postTags(), boundaryScanner);
}
} else {
Expand All @@ -115,7 +113,7 @@ public HighlightField highlight(FieldHighlightContext fieldContext) {
fragmentsBuilder = new ScoreOrderFragmentsBuilder(field.fieldOptions().preTags(),
field.fieldOptions().postTags(), boundaryScanner);
} else {
fragmentsBuilder = new SourceScoreOrderFragmentsBuilder(fieldType, context,
fragmentsBuilder = new SourceScoreOrderFragmentsBuilder(fieldType, hitContext.sourceLookup(),
field.fieldOptions().preTags(), field.fieldOptions().postTags(), boundaryScanner);
}
} else {
Expand All @@ -124,8 +122,8 @@ public HighlightField highlight(FieldHighlightContext fieldContext) {
field.fieldOptions().postTags(), boundaryScanner);
} else {
fragmentsBuilder =
new SourceSimpleFragmentsBuilder(fieldType, context, field.fieldOptions().preTags(),
field.fieldOptions().postTags(), boundaryScanner);
new SourceSimpleFragmentsBuilder(fieldType, hitContext.sourceLookup(),
field.fieldOptions().preTags(), field.fieldOptions().postTags(), boundaryScanner);
}
}
}
Expand Down
Loading

0 comments on commit dfd7f22

Please sign in to comment.