Skip to content

Commit

Permalink
Introduce FetchContext (#62357)
Browse files Browse the repository at this point in the history
We currently pass a SearchContext around to share configuration among
FetchSubPhases. With the introduction of runtime fields, it would be useful
to start storing some state on this context to be shared between different
subphases (for example, stored fields or search lookups can be loaded lazily
but referred to by many different subphases). However, SearchContext is a
very large and unwieldy class, and adding more methods or state here feels
like a bridge too far.

This commit introduces a new FetchContext class that exposes only those
methods on SearchContext that are required for fetch phases. This reduces
the API surface area for fetch phases considerably, and should give us some
leeway to add further state.
  • Loading branch information
romseygeek committed Sep 17, 2020
1 parent d091c12 commit 63afc61
Show file tree
Hide file tree
Showing 25 changed files with 327 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightPhase;
import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
import org.elasticsearch.search.fetch.subphase.highlight.SearchHighlightContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.lookup.SourceLookup;

Expand All @@ -57,11 +56,11 @@ final class PercolatorHighlightSubFetchPhase implements FetchSubPhase {
}

@Override
public FetchSubPhaseProcessor getProcessor(SearchContext searchContext, SearchLookup lookup) throws IOException {
if (searchContext.highlight() == null) {
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext, SearchLookup lookup) {
if (fetchContext.highlight() == null) {
return null;
}
List<PercolateQuery> percolateQueries = locatePercolatorQuery(searchContext.query());
List<PercolateQuery> percolateQueries = locatePercolatorQuery(fetchContext.query());
if (percolateQueries.isEmpty()) {
return null;
}
Expand All @@ -70,7 +69,7 @@ public FetchSubPhaseProcessor getProcessor(SearchContext searchContext, SearchLo
LeafReaderContext ctx;

@Override
public void setNextReader(LeafReaderContext readerContext) throws IOException {
public void setNextReader(LeafReaderContext readerContext) {
this.ctx = readerContext;
}

Expand Down Expand Up @@ -111,10 +110,8 @@ public void process(HitContext hit) throws IOException {
);
subContext.sourceLookup().setSource(document);
// force source because MemoryIndex does not store fields
SearchHighlightContext highlight = new SearchHighlightContext(searchContext.highlight().fields(), true);
QueryShardContext shardContext = new QueryShardContext(searchContext.getQueryShardContext());
FetchSubPhaseProcessor processor = highlightPhase.getProcessor(shardContext, searchContext.shardTarget(),
highlight, query);
SearchHighlightContext highlight = new SearchHighlightContext(fetchContext.highlight().fields(), true);
FetchSubPhaseProcessor processor = highlightPhase.getProcessor(fetchContext, highlight, query);
processor.process(subContext);
for (Map.Entry<String, HighlightField> entry : subContext.hit().getHighlightFields().entrySet()) {
if (percolateQuery.getDocuments().size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import org.elasticsearch.Version;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SearchLookup;

import java.io.IOException;
Expand All @@ -58,10 +58,10 @@ final class PercolatorMatchedSlotSubFetchPhase implements FetchSubPhase {
static final String FIELD_NAME_PREFIX = "_percolator_document_slot";

@Override
public FetchSubPhaseProcessor getProcessor(SearchContext searchContext, SearchLookup lookup) throws IOException {
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext, SearchLookup lookup) throws IOException {

List<PercolateContext> percolateContexts = new ArrayList<>();
List<PercolateQuery> percolateQueries = locatePercolatorQuery(searchContext.query());
List<PercolateQuery> percolateQueries = locatePercolatorQuery(fetchContext.query());
boolean singlePercolateQuery = percolateQueries.size() == 1;
for (PercolateQuery pq : percolateQueries) {
percolateContexts.add(new PercolateContext(pq, singlePercolateQuery));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,33 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
import org.elasticsearch.common.lucene.search.function.RandomScoreFunction;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.subphase.highlight.SearchHighlightContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ESTestCase;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;

import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;

public class PercolatorHighlightSubFetchPhaseTests extends ESTestCase {

public void testHitsExecutionNeeded() throws IOException {
public void testHitsExecutionNeeded() {
PercolateQuery percolateQuery = new PercolateQuery("_name", ctx -> null, Collections.singletonList(new BytesArray("{}")),
new MatchAllDocsQuery(), Mockito.mock(IndexSearcher.class), null, new MatchAllDocsQuery());
PercolatorHighlightSubFetchPhase subFetchPhase = new PercolatorHighlightSubFetchPhase(emptyMap());
SearchContext searchContext = Mockito.mock(SearchContext.class);
Mockito.when(searchContext.highlight()).thenReturn(new SearchHighlightContext(Collections.emptyList()));
Mockito.when(searchContext.query()).thenReturn(new MatchAllDocsQuery());
FetchContext fetchContext = mock(FetchContext.class);
Mockito.when(fetchContext.highlight()).thenReturn(new SearchHighlightContext(Collections.emptyList()));
Mockito.when(fetchContext.query()).thenReturn(new MatchAllDocsQuery());

assertNull(subFetchPhase.getProcessor(searchContext, null));
Mockito.when(searchContext.query()).thenReturn(percolateQuery);
assertNotNull(subFetchPhase.getProcessor(searchContext, null));
assertNull(subFetchPhase.getProcessor(fetchContext, null));
Mockito.when(fetchContext.query()).thenReturn(percolateQuery);
assertNotNull(subFetchPhase.getProcessor(fetchContext, null));
}

public void testLocatePercolatorQuery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.ScoreDoc;
Expand All @@ -37,9 +36,9 @@
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase.HitContext;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -63,7 +62,6 @@ public void testHitsExecute() throws Exception {
PercolatorMatchedSlotSubFetchPhase phase = new PercolatorMatchedSlotSubFetchPhase();

try (DirectoryReader reader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(reader);
LeafReaderContext context = reader.leaves().get(0);
// A match:
{
Expand All @@ -75,7 +73,7 @@ public void testHitsExecute() throws Exception {
PercolateQuery percolateQuery = new PercolateQuery("_name", queryStore, Collections.emptyList(),
new MatchAllDocsQuery(), memoryIndex.createSearcher(), null, new MatchNoDocsQuery());

SearchContext sc = mock(SearchContext.class);
FetchContext sc = mock(FetchContext.class);
when(sc.query()).thenReturn(percolateQuery);

FetchSubPhaseProcessor processor = phase.getProcessor(sc, null);
Expand All @@ -96,7 +94,7 @@ public void testHitsExecute() throws Exception {
PercolateQuery percolateQuery = new PercolateQuery("_name", queryStore, Collections.emptyList(),
new MatchAllDocsQuery(), memoryIndex.createSearcher(), null, new MatchNoDocsQuery());

SearchContext sc = mock(SearchContext.class);
FetchContext sc = mock(FetchContext.class);
when(sc.query()).thenReturn(percolateQuery);

FetchSubPhaseProcessor processor = phase.getProcessor(sc, null);
Expand All @@ -116,7 +114,7 @@ public void testHitsExecute() throws Exception {
PercolateQuery percolateQuery = new PercolateQuery("_name", queryStore, Collections.emptyList(),
new MatchAllDocsQuery(), memoryIndex.createSearcher(), null, new MatchNoDocsQuery());

SearchContext sc = mock(SearchContext.class);
FetchContext sc = mock(FetchContext.class);
when(sc.query()).thenReturn(percolateQuery);

FetchSubPhaseProcessor processor = phase.getProcessor(sc, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ setup:
body:
keyword: [ "a" ]

- do:
indices.refresh:
index: [ test ]

- do:
catch: bad_request
search:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,22 @@

package org.elasticsearch.search.fetch;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.termvectors.TermVectorsRequest;
import org.elasticsearch.action.termvectors.TermVectorsResponse;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.termvectors.TermVectorsService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.SearchExtBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
Expand Down Expand Up @@ -121,21 +117,21 @@ private static final class TermVectorsFetchSubPhase implements FetchSubPhase {
private static final String NAME = "term_vectors_fetch";

@Override
public FetchSubPhaseProcessor getProcessor(SearchContext searchContext, SearchLookup lookup) {
public FetchSubPhaseProcessor getProcessor(FetchContext searchContext, SearchLookup lookup) {
return new FetchSubPhaseProcessor() {
@Override
public void setNextReader(LeafReaderContext readerContext) {

}

@Override
public void process(HitContext hitContext) {
public void process(HitContext hitContext) throws IOException {
hitExecute(searchContext, hitContext);
}
};
}

private void hitExecute(SearchContext context, HitContext hitContext) {
private void hitExecute(FetchContext context, HitContext hitContext) throws IOException {
TermVectorsFetchBuilder fetchSubPhaseBuilder = (TermVectorsFetchBuilder)context.getSearchExt(NAME);
if (fetchSubPhaseBuilder == null) {
return;
Expand All @@ -146,19 +142,18 @@ private void hitExecute(SearchContext context, HitContext hitContext) {
hitField = new DocumentField(NAME, new ArrayList<>(1));
hitContext.hit().setDocumentField(NAME, hitField);
}
TermVectorsRequest termVectorsRequest = new TermVectorsRequest(context.indexShard().shardId().getIndex().getName(),
hitContext.hit().getType(), hitContext.hit().getId());
TermVectorsResponse termVector = TermVectorsService.getTermVectors(context.indexShard(), termVectorsRequest);
try {
Terms terms = hitContext.reader().getTermVector(hitContext.docId(), field);
if (terms != null) {
TermsEnum te = terms.iterator();
Map<String, Integer> tv = new HashMap<>();
TermsEnum terms = termVector.getFields().terms(field).iterator();
BytesRef term;
while ((term = terms.next()) != null) {
tv.put(term.utf8ToString(), terms.postings(null, PostingsEnum.ALL).freq());
PostingsEnum pe = null;
while ((term = te.next()) != null) {
pe = te.postings(pe, PostingsEnum.FREQS);
pe.nextDoc();
tv.put(term.utf8ToString(), pe.freq());
}
hitField.getValues().add(tv);
} catch (IOException e) {
LogManager.getLogger(FetchSubPhasePluginIT.class).info("Swallowed exception", e);
}
}
}
Expand Down
Loading

0 comments on commit 63afc61

Please sign in to comment.