Skip to content

Commit

Permalink
Move Aggregator#buildTopLevel() to search worker thread. (elastic#98715)
Browse files Browse the repository at this point in the history
This commit introduces an AggregatorCollector that contains a finish method which performs aggregation 
post-collection and builds the internal aggregation for this collector. This method is called on the worker 
thread at the end of the collection phase.
  • Loading branch information
iverase authored Sep 19, 2023
1 parent 92458fc commit 4bc1afd
Show file tree
Hide file tree
Showing 18 changed files with 293 additions and 224 deletions.
2 changes: 1 addition & 1 deletion docs/reference/search/profile.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ The API returns the following result:
"time_in_nanos": 22577
},
{
"name": "BucketCollectorWrapper: [BucketCollectorWrapper[bucketCollector=[my_scoped_agg, my_global_agg]]]",
"name": "AggregatorCollector: [my_scoped_agg, my_global_agg]",
"reason": "aggregation",
"time_in_nanos": 867617
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@
*/
package org.elasticsearch.search.aggregations;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.search.aggregations.support.TimeSeriesIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.QueryPhase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

Expand All @@ -31,36 +28,30 @@ public static void preProcess(SearchContext context) {
if (context.aggregations() == null) {
return;
}
final Supplier<Collector> collectorSupplier;
final Supplier<AggregatorCollector> collectorSupplier;
if (context.aggregations().isInSortOrderExecutionRequired()) {
executeInSortOrder(context, newBucketCollector(context));
collectorSupplier = () -> BucketCollector.NO_OP_COLLECTOR;
AggregatorCollector collector = newAggregatorCollector(context);
executeInSortOrder(context, collector.bucketCollector);
collectorSupplier = () -> new AggregatorCollector(collector.aggregators, BucketCollector.NO_OP_BUCKET_COLLECTOR);
} else {
collectorSupplier = () -> newBucketCollector(context).asCollector();
collectorSupplier = () -> newAggregatorCollector(context);
}
context.aggregations().registerAggsCollectorManager(new CollectorManager<>() {
@Override
public Collector newCollector() {
return collectorSupplier.get();
}

@Override
public Void reduce(Collection<Collector> collectors) {
// we cannot run post-collection method here because we need to do it after the optional timeout
// has been removed from the index searcher. Therefore, we delay this processing to the
// AggregationPhase#execute method.
return null;
}
});
context.aggregations()
.registerAggsCollectorManager(
new AggregatorCollectorManager(
collectorSupplier,
internalAggregations -> context.queryResult().aggregations(internalAggregations),
() -> context.aggregations().getAggregationReduceContextBuilder().forPartialReduction()
)
);
}

private static BucketCollector newBucketCollector(SearchContext context) {
private static AggregatorCollector newAggregatorCollector(SearchContext context) {
try {
Aggregator[] aggregators = context.aggregations().factories().createTopLevelAggregators();
context.aggregations().aggregators(aggregators);
BucketCollector bucketCollector = MultiBucketCollector.wrap(true, List.of(aggregators));
bucketCollector.preCollection();
return bucketCollector;
return new AggregatorCollector(aggregators, bucketCollector);
} catch (IOException e) {
throw new AggregationInitializationException("Could not initialize aggregators", e);
}
Expand Down Expand Up @@ -96,48 +87,4 @@ private static List<Runnable> getCancellationChecks(SearchContext context) {

return cancellationChecks;
}

public static void execute(SearchContext context) {
if (context.aggregations() == null) {
context.queryResult().aggregations(null);
return;
}

if (context.queryResult().hasAggs()) {
// no need to compute the aggs twice, they should be computed on a per context basis
return;
}

final List<InternalAggregations> internalAggregations = new ArrayList<>(context.aggregations().aggregators().size());
for (Aggregator[] aggregators : context.aggregations().aggregators()) {
final List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
for (Aggregator aggregator : aggregators) {
try {
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
// release the aggregator to claim the used bytes as we don't need it anymore
aggregator.releaseAggregations();
}
internalAggregations.add(InternalAggregations.from(aggregations));
}

if (internalAggregations.size() > 1) {
// we execute this search using more than one slice. In order to keep memory requirements
// low, we do a partial reduction here.
context.queryResult()
.aggregations(
InternalAggregations.topLevelReduce(
internalAggregations,
context.aggregations().getAggregationReduceContextBuilder().forPartialReduction()
)
);
} else {
context.queryResult().aggregations(internalAggregations.get(0));
}

// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.search.internal.TwoPhaseCollector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/** Collector that controls the life cycle of an aggregation document collection. */
public class AggregatorCollector implements TwoPhaseCollector {
final Aggregator[] aggregators;
final BucketCollector bucketCollector;
final List<InternalAggregation> internalAggregations;

public AggregatorCollector(Aggregator[] aggregators, BucketCollector bucketCollector) {
this.aggregators = aggregators;
this.bucketCollector = bucketCollector;
this.internalAggregations = new ArrayList<>(aggregators.length);
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
return bucketCollector.getLeafCollector(new AggregationExecutionContext(context, null, null, null));
}

@Override
public ScoreMode scoreMode() {
return bucketCollector.scoreMode();
}

@Override
public void doPostCollection() throws IOException {
bucketCollector.postCollection();
for (Aggregator aggregator : aggregators) {
internalAggregations.add(aggregator.buildTopLevel());
// release the aggregator to claim the used bytes as we don't need it anymore
aggregator.releaseAggregations();
}
}

@Override
public String toString() {
String[] aggNames = new String[aggregators.length];
for (int i = 0; i < aggregators.length; i++) {
aggNames[i] = aggregators[i].name();
}
return Arrays.toString(aggNames);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations;

import org.apache.lucene.search.CollectorManager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

/** Collector manager that produces {@link AggregatorCollector} and merges them during the reduce phase. */
public class AggregatorCollectorManager implements CollectorManager<AggregatorCollector, Void> {

private final Supplier<AggregatorCollector> collectorSupplier;
private final Consumer<InternalAggregations> internalAggregationsConsumer;
private final Supplier<AggregationReduceContext> reduceContextSupplier;

public AggregatorCollectorManager(
Supplier<AggregatorCollector> collectorSupplier,
Consumer<InternalAggregations> internalAggregationsConsumer,
Supplier<AggregationReduceContext> reduceContextSupplier
) {
this.collectorSupplier = collectorSupplier;
this.internalAggregationsConsumer = internalAggregationsConsumer;
this.reduceContextSupplier = reduceContextSupplier;
}

@Override
public AggregatorCollector newCollector() throws IOException {
return collectorSupplier.get();
}

@Override
public Void reduce(Collection<AggregatorCollector> collectors) throws IOException {
if (collectors.size() > 1) {
// we execute this search using more than one slice. In order to keep memory requirements
// low, we do a partial reduction here.
final List<InternalAggregations> internalAggregations = new ArrayList<>(collectors.size());
collectors.forEach(c -> internalAggregations.add(InternalAggregations.from(c.internalAggregations)));
internalAggregationsConsumer.accept(InternalAggregations.topLevelReduce(internalAggregations, reduceContextSupplier.get()));
} else if (collectors.size() == 1) {
internalAggregationsConsumer.accept(InternalAggregations.from(collectors.iterator().next().internalAggregations));
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@
*/
package org.elasticsearch.search.aggregations;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

/**
Expand All @@ -21,8 +18,7 @@ public class SearchContextAggregations {

private final AggregatorFactories factories;
private final Supplier<AggregationReduceContext.Builder> toAggregationReduceContextBuilder;
private final List<Aggregator[]> aggregators;
private CollectorManager<Collector, Void> aggCollectorManager;
private CollectorManager<AggregatorCollector, Void> aggCollectorManager;

/**
* Creates a new aggregation context with the parsed aggregator factories
Expand All @@ -33,37 +29,23 @@ public SearchContextAggregations(
) {
this.factories = factories;
this.toAggregationReduceContextBuilder = toAggregationReduceContextBuilder;
this.aggregators = new ArrayList<>();
}

public AggregatorFactories factories() {
return factories;
}

public List<Aggregator[]> aggregators() {
return aggregators;
}

/**
* Registers all the created aggregators (top level aggregators) for the search execution context.
*
* @param aggregators The top level aggregators of the search execution.
*/
public void aggregators(Aggregator[] aggregators) {
this.aggregators.add(aggregators);
}

/**
* Registers the collector to be run for the aggregations phase
*/
public void registerAggsCollectorManager(CollectorManager<Collector, Void> aggCollectorManager) {
public void registerAggsCollectorManager(CollectorManager<AggregatorCollector, Void> aggCollectorManager) {
this.aggCollectorManager = aggCollectorManager;
}

/**
* Returns the collector to be run for the aggregations phase
*/
public CollectorManager<Collector, Void> getAggsCollectorManager() {
public CollectorManager<AggregatorCollector, Void> getAggsCollectorManager() {
return aggCollectorManager;
}

Expand Down
Loading

0 comments on commit 4bc1afd

Please sign in to comment.