diff --git a/CHANGELOG.md b/CHANGELOG.md index 27c2386e9cecb..4bd5da5f06166 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -115,6 +115,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix painless casting bug, which crashes the OpenSearch process ([#8315](https://github.com/opensearch-project/OpenSearch/pull/8315)) - Optimize Metadata build() to skip redundant computations as part of ClusterState build ([#7853](https://github.com/opensearch-project/OpenSearch/pull/7853)) - Make Span exporter configurable ([#8620](https://github.com/opensearch-project/OpenSearch/issues/8620)) +- Perform aggregation postCollection in ContextIndexSearcher after searching leaves ([#8303](https://github.com/opensearch-project/OpenSearch/pull/8303)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index e83bfe8486904..ee29d6bfe2b62 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -65,6 +65,7 @@ import org.opensearch.index.search.NestedHelper; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.builder.SearchSourceBuilder; @@ -176,7 +177,7 @@ final class DefaultSearchContext extends SearchContext { private SuggestionSearchContext suggest; private List rescore; private Profilers profilers; - + private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR; private final Map searchExtBuilders = new HashMap<>(); private final Map, CollectorManager> queryCollectorManagers = new HashMap<>(); private final QueryShardContext queryShardContext; @@ -919,4 +920,14 @@ public ReaderContext readerContext() { public InternalAggregation.ReduceContext partial() { return requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction(); } + + @Override + public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) { + this.bucketCollectorProcessor = bucketCollectorProcessor; + } + + @Override + public BucketCollectorProcessor bucketCollectorProcessor() { + return bucketCollectorProcessor; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index ab80aed5c3200..6058508a6e279 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -17,11 +17,8 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Deque; -import java.util.LinkedList; import java.util.List; /** @@ -53,31 +50,12 @@ public Collector newCollector() throws IOException { @Override public ReduceableSearchResult reduce(Collection collectors) throws IOException { - List aggregators = new ArrayList<>(); - - final Deque allCollectors = new LinkedList<>(collectors); - while (!allCollectors.isEmpty()) { - final Collector currentCollector = allCollectors.pop(); - if (currentCollector instanceof Aggregator) { - aggregators.add((Aggregator) currentCollector); - } else if (currentCollector instanceof InternalProfileCollector) { - if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) { - aggregators.add((Aggregator) ((InternalProfileCollector) currentCollector).getCollector()); - } else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) { - allCollectors.addAll( - Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors()) - ); - } - } else if (currentCollector instanceof MultiBucketCollector) { - allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors())); - } - } - + final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); final List internals = new ArrayList<>(aggregators.size()); context.aggregations().resetBucketMultiConsumer(); for (Aggregator aggregator : aggregators) { try { - aggregator.postCollection(); + // post collection is called in ContextIndexSearcher after search on leaves are completed internals.add(aggregator.buildTopLevel()); } catch (IOException e) { throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); diff --git a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java new file mode 100644 index 0000000000000..352ecf8bc94ad --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations; + +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.MultiCollector; +import org.opensearch.common.lucene.MinimumScoreCollector; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.profile.query.InternalProfileCollector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +/** + * Processor to perform collector level processing specific to {@link BucketCollector} in different stages like: a) PostCollection + * after search on each leaf is completed and b) process the collectors to perform reduce after collection is completed + */ +public class BucketCollectorProcessor { + + /** + * Performs {@link BucketCollector#postCollection()} on all the {@link BucketCollector} in the given {@link Collector} collector tree + * after the collection of documents on a leaf is completed. This method will be called by different slice threads on its own collector + * tree instance in case of concurrent segment search such that postCollection happens on the same slice thread which initialize and + * perform collection of the documents for a leaf segment. For sequential search case, there is always a single search thread which + * performs both collection and postCollection on {@link BucketCollector}. + *

+ * This was originally done in {@link org.opensearch.search.aggregations.AggregationProcessor#postProcess(SearchContext)}. But with + * concurrent segment search path this needs to be performed here. There are AssertingCodecs in lucene which validates that the + * DocValues created for a field is always used by the same thread for a request. In concurrent segment search case, the DocValues + * gets initialized on different threads for different segments (or slices). Whereas the postProcess happens as part of reduce phase + * and is performed on the separate thread which is from search threadpool and not from slice threadpool. So two different threads + * performs the access on the DocValues causing the AssertingCodec to fail. From functionality perspective, there is no issue as + * DocValues for each segment is always accessed by a single thread at a time but those threads may be different (e.g. slice thread + * during collection and then search thread during reduce) + *

+ *

+ * NOTE: We can evaluate and deprecate this postCollection processing once lucene release the changes described in the + * issue-12375. With this new change we should be able to implement + * {@link BucketCollector#postCollection()} functionality using the lucene interface directly such that postCollection gets called + * from the slice thread by lucene itself + *

+ * @param collectorTree collector tree used by calling thread + */ + public void processPostCollection(Collector collectorTree) throws IOException { + final Queue collectors = new LinkedList<>(); + collectors.offer(collectorTree); + while (!collectors.isEmpty()) { + Collector currentCollector = collectors.poll(); + if (currentCollector instanceof InternalProfileCollector) { + collectors.offer(((InternalProfileCollector) currentCollector).getCollector()); + } else if (currentCollector instanceof MinimumScoreCollector) { + collectors.offer(((MinimumScoreCollector) currentCollector).getCollector()); + } else if (currentCollector instanceof MultiCollector) { + for (Collector innerCollector : ((MultiCollector) currentCollector).getCollectors()) { + collectors.offer(innerCollector); + } + } else if (currentCollector instanceof BucketCollector) { + ((BucketCollector) currentCollector).postCollection(); + } + } + } + + /** + * Unwraps the input collection of {@link Collector} to get the list of the {@link Aggregator} used by different slice threads. The + * input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager} + * during the reduce phase. This list of {@link Aggregator} is used to create {@link InternalAggregation} and optionally perform + * reduce at shard level before returning response to coordinator + * @param collectors collection of aggregation collectors to reduce + * @return list of unwrapped {@link Aggregator} + */ + public List toAggregators(Collection collectors) { + List aggregators = new ArrayList<>(); + + final Deque allCollectors = new LinkedList<>(collectors); + while (!allCollectors.isEmpty()) { + final Collector currentCollector = allCollectors.pop(); + if (currentCollector instanceof Aggregator) { + aggregators.add((Aggregator) currentCollector); + } else if (currentCollector instanceof InternalProfileCollector) { + if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) { + aggregators.add((Aggregator) ((InternalProfileCollector) currentCollector).getCollector()); + } else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) { + allCollectors.addAll( + Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors()) + ); + } + } else if (currentCollector instanceof MultiBucketCollector) { + allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors())); + } + } + return aggregators; + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java index 592fb8cc6e674..336ad8739eb41 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java @@ -28,12 +28,16 @@ * avoid the increase in aggregation result sets returned by each shard to coordinator where final reduce happens for results received from * all the shards */ -public class ConcurrentAggregationProcessor extends DefaultAggregationProcessor { +public class ConcurrentAggregationProcessor implements AggregationProcessor { + + private final BucketCollectorProcessor bucketCollectorProcessor = new BucketCollectorProcessor(); @Override public void preProcess(SearchContext context) { try { if (context.aggregations() != null) { + // update the bucket collector process as there is aggregation in the request + context.setBucketCollectorProcessor(bucketCollectorProcessor); if (context.aggregations().factories().hasNonGlobalAggregator()) { context.queryCollectorManagers().put(NonGlobalAggCollectorManager.class, new NonGlobalAggCollectorManager(context)); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java index 05aa4a9acb270..24b05ebcf3a61 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java @@ -24,10 +24,14 @@ */ public class DefaultAggregationProcessor implements AggregationProcessor { + private final BucketCollectorProcessor bucketCollectorProcessor = new BucketCollectorProcessor(); + @Override public void preProcess(SearchContext context) { try { if (context.aggregations() != null) { + // update the bucket collector process as there is aggregation in the request + context.setBucketCollectorProcessor(bucketCollectorProcessor); if (context.aggregations().factories().hasNonGlobalAggregator()) { context.queryCollectorManagers() .put(NonGlobalAggCollectorManager.class, new NonGlobalAggCollectorManagerWithSingleCollector(context)); diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index 2f1c5475c9cf6..e3ca932eb4699 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -276,6 +276,7 @@ protected void search(List leaves, Weight weight, Collector c searchLeaf(leaves.get(i), weight, collector); } } + searchContext.bucketCollectorProcessor().processPostCollection(collector); } /** diff --git a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java index 9bfc0e8b6fea5..790d2ed5ee4b7 100644 --- a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java @@ -50,6 +50,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.collapse.CollapseContext; @@ -548,4 +549,14 @@ public ReaderContext readerContext() { public InternalAggregation.ReduceContext partial() { return in.partial(); } + + @Override + public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) { + in.setBucketCollectorProcessor(bucketCollectorProcessor); + } + + @Override + public BucketCollectorProcessor bucketCollectorProcessor() { + return in.bucketCollectorProcessor(); + } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 319a5624bbf56..fd02ba2ba12bb 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -53,6 +53,8 @@ import org.opensearch.search.RescoreDocIds; import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.collapse.CollapseContext; @@ -73,6 +75,7 @@ import org.opensearch.search.sort.SortAndFormats; import org.opensearch.search.suggest.SuggestionSearchContext; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -94,6 +97,20 @@ public abstract class SearchContext implements Releasable { public static final int TRACK_TOTAL_HITS_DISABLED = -1; public static final int DEFAULT_TRACK_TOTAL_HITS_UP_TO = 10000; + // no-op bucket collector processor + public static final BucketCollectorProcessor NO_OP_BUCKET_COLLECTOR_PROCESSOR = new BucketCollectorProcessor() { + @Override + public void processPostCollection(Collector collectorTree) { + // do nothing as there is no aggregation collector + } + + @Override + public List toAggregators(Collection collectors) { + // should not be called when there is no aggregation collector + throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR"); + } + }; + private final List releasables = new CopyOnWriteArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(false); private InnerHitsContext innerHitsContext; @@ -449,4 +466,9 @@ public String toString() { public abstract ReaderContext readerContext(); public abstract InternalAggregation.ReduceContext partial(); + + // processor used for bucket collectors + public abstract void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor); + + public abstract BucketCollectorProcessor bucketCollectorProcessor(); } diff --git a/server/src/test/java/org/opensearch/search/SearchCancellationTests.java b/server/src/test/java/org/opensearch/search/SearchCancellationTests.java index e67123bf2c51e..011723da36a30 100644 --- a/server/src/test/java/org/opensearch/search/SearchCancellationTests.java +++ b/server/src/test/java/org/opensearch/search/SearchCancellationTests.java @@ -48,6 +48,7 @@ import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.automaton.CompiledAutomaton; import org.apache.lucene.util.automaton.RegExp; +import org.junit.Before; import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.search.internal.ContextIndexSearcher; @@ -71,6 +72,7 @@ public class SearchCancellationTests extends OpenSearchTestCase { private static Directory dir; private static IndexReader reader; + private SearchContext searchContext; @BeforeClass public static void setup() throws IOException { @@ -106,6 +108,12 @@ public static void cleanup() throws IOException { reader = null; } + @Before + public void testSetup() { + searchContext = mock(SearchContext.class); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); + } + public void testAddingCancellationActions() throws IOException { ContextIndexSearcher searcher = new ContextIndexSearcher( reader, @@ -114,7 +122,7 @@ public void testAddingCancellationActions() throws IOException { IndexSearcher.getDefaultQueryCachingPolicy(), true, null, - mock(SearchContext.class) + searchContext ); NullPointerException npe = expectThrows(NullPointerException.class, () -> searcher.addQueryCancellation(null)); assertEquals("cancellation runnable should not be null", npe.getMessage()); @@ -128,7 +136,6 @@ public void testAddingCancellationActions() throws IOException { public void testCancellableCollector() throws IOException { TotalHitCountCollector collector1 = new TotalHitCountCollector(); Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); }; - SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); ContextIndexSearcher searcher = new ContextIndexSearcher( @@ -167,7 +174,7 @@ public void testExitableDirectoryReader() throws IOException { IndexSearcher.getDefaultQueryCachingPolicy(), true, null, - mock(SearchContext.class) + searchContext ); searcher.addQueryCancellation(cancellation); CompiledAutomaton automaton = new CompiledAutomaton(new RegExp("a.*").toAutomaton()); diff --git a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java index c9755cec2a33d..308fa5ee18072 100644 --- a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java +++ b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java @@ -259,6 +259,7 @@ public void onRemoval(ShardId shardId, Accountable accountable) { SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); ContextIndexSearcher searcher = new ContextIndexSearcher( filteredReader, IndexSearcher.getDefaultSimilarity(), diff --git a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java index f0f692e15f066..528d65bcc5ef2 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java @@ -120,6 +120,7 @@ public void setUp() throws Exception { SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); searcher = new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), diff --git a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java index cf4d7798056c4..61b78905334ec 100644 --- a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java @@ -1207,6 +1207,7 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader, Execu SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); return new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), @@ -1223,6 +1224,7 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); return new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), diff --git a/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java index 9c27783263c5a..6d30d7993c850 100644 --- a/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java +++ b/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java @@ -1427,6 +1427,7 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader, Execu SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); return new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), @@ -1443,6 +1444,7 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); return new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index cf0f9dbb04f7f..7cc9ffa408173 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -349,6 +349,7 @@ public boolean shouldCache(Query query) { when(searchContext.indexShard()).thenReturn(indexShard); when(searchContext.aggregations()).thenReturn(new SearchContextAggregations(AggregatorFactories.EMPTY, bucketConsumer)); when(searchContext.query()).thenReturn(query); + when(searchContext.bucketCollectorProcessor()).thenReturn(new BucketCollectorProcessor()); /* * Always use the circuit breaking big arrays instance so that the CircuitBreakerService * we're passed gets a chance to break. diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index 0f2433e1af04e..1561cf32e3ef6 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -52,6 +52,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.collapse.CollapseContext; @@ -116,6 +117,7 @@ public class TestSearchContext extends SearchContext { private Profilers profilers; private CollapseContext collapse; protected boolean concurrentSegmentSearchEnabled; + private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR; /** * Sets the concurrent segment search enabled field @@ -667,6 +669,16 @@ public InternalAggregation.ReduceContext partial() { return InternalAggregationTestCase.emptyReduceContextBuilder().forPartialReduction(); } + @Override + public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) { + this.bucketCollectorProcessor = bucketCollectorProcessor; + } + + @Override + public BucketCollectorProcessor bucketCollectorProcessor() { + return bucketCollectorProcessor; + } + /** * Clean the query results by consuming all of it */