Skip to content

Commit

Permalink
Add support for CollectorManager wrapping with profiling with concurr…
Browse files Browse the repository at this point in the history
…ent aggregation

Signed-off-by: Ticheng Lin <[email protected]>
  • Loading branch information
ticheng-aws committed Aug 5, 2023
1 parent 1d28fac commit ee875a2
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.lucene.search.CollectorManager;
import org.opensearch.common.CheckedFunction;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
Expand Down Expand Up @@ -42,12 +41,16 @@ class AggregationCollectorManager implements CollectorManager<Collector, Reducea

@Override
public Collector newCollector() throws IOException {
final Collector collector = createCollector(context, aggProvider.apply(context), collectorReason);
final Collector collector = createCollector(aggProvider.apply(context));
// For Aggregations we should not have a NO_OP_Collector
assert collector != BucketCollector.NO_OP_COLLECTOR;
return collector;
}

public String getCollectorReason() {
return collectorReason;

Check warning on line 51 in server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java#L51

Added line #L51 was not covered by tests
}

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
Expand Down Expand Up @@ -77,17 +80,9 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
}
}

static Collector createCollector(SearchContext context, List<Aggregator> collectors, String reason) throws IOException {
static Collector createCollector(List<Aggregator> collectors) throws IOException {
Collector collector = MultiBucketCollector.wrap(collectors);
((BucketCollector) collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(
collector,
reason,
// TODO: report on child aggs as well
Collections.emptyList()
);
}
return collector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public void postProcess(SearchContext context) {
try {
if (globalCollectorManager != null) {
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
CollectorResult.REASON_AGGREGATION_GLOBAL,
Collections.emptyList()
);
if (context.getProfilers() != null) {
globalCollectorManager = new InternalProfileCollectorManager(

Check warning on line 69 in server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java#L69

Added line #L69 was not covered by tests
globalCollectorManager,
CollectorResult.REASON_AGGREGATION_GLOBAL,
Collections.emptyList()

Check warning on line 72 in server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java#L72

Added line #L72 was not covered by tests
);
context.getProfilers().addQueryProfiler().setCollector((InternalProfileComponent) globalCollectorManager);
}
final ReduceableSearchResult result = context.searcher().search(query, globalCollectorManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import org.apache.lucene.search.Query;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileComponent;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.query.QueryPhaseExecutionException;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -74,7 +75,13 @@ public void postProcess(SearchContext context) {
if (context.getProfilers() != null) {
context.getProfilers()
.addQueryProfiler()
.setCollector((InternalProfileComponent) globalCollectorManager.newCollector());
.setCollector(

Check warning on line 78 in server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java#L78

Added line #L78 was not covered by tests
new InternalProfileCollector(
globalCollectorManager.newCollector(),
globalCollectorManager.getCollectorReason(),
Collections.emptyList()

Check warning on line 82 in server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java#L80-L82

Added lines #L80 - L82 were not covered by tests
)
);
}
context.searcher().search(query, globalCollectorManager.newCollector());
globalCollectorManager.reduce(List.of()).reduce(context.queryResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.search.Weight;
import org.opensearch.common.lucene.MinimumScoreCollector;
import org.opensearch.common.lucene.search.FilteredCollector;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.profile.query.InternalProfileCollectorManager;

Expand Down Expand Up @@ -99,11 +100,15 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i
*/
protected InternalProfileCollectorManager createWithProfiler(InternalProfileCollectorManager in) throws IOException {
final CollectorManager<? extends Collector, ReduceableSearchResult> manager = createManager(in);
return new InternalProfileCollectorManager(
manager,
profilerName,
in != null ? Collections.singletonList(in) : Collections.emptyList()
);
if (manager instanceof InternalProfileCollectorManager) {
return (InternalProfileCollectorManager) manager;

Check warning on line 104 in server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java#L104

Added line #L104 was not covered by tests
} else {
return new InternalProfileCollectorManager(
manager,
profilerName,
in != null ? Collections.singletonList(in) : Collections.emptyList()
);
}
}

/**
Expand Down Expand Up @@ -198,16 +203,33 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i

for (CollectorManager<? extends Collector, ReduceableSearchResult> manager : subs) {
final Collector collector = manager.newCollector();
if (!(collector instanceof InternalProfileCollector)) {
throw new IllegalArgumentException("non-profiling collector");
}
subCollectors.add((InternalProfileCollector) collector);
subCollectors.add(new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION, Collections.emptyList()));

Check warning on line 206 in server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java#L206

Added line #L206 was not covered by tests
}

final Collector collector = MultiCollector.wrap(subCollectors);
return new InternalProfileCollector(collector, REASON_SEARCH_MULTI, subCollectors);
}

@Override
protected InternalProfileCollectorManager createWithProfiler(InternalProfileCollectorManager in) {
final List<CollectorManager<?, ReduceableSearchResult>> managers = new ArrayList<>();
final List<InternalProfileCollectorManager> children = new ArrayList<>();
managers.add(in);
children.add(in);

Check warning on line 218 in server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java#L215-L218

Added lines #L215 - L218 were not covered by tests
for (CollectorManager<? extends Collector, ReduceableSearchResult> manager : subs) {
InternalProfileCollectorManager subCollectorManager = new InternalProfileCollectorManager(

Check warning on line 220 in server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java#L220

Added line #L220 was not covered by tests
manager,
CollectorResult.REASON_AGGREGATION,
Collections.emptyList()

Check warning on line 223 in server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java#L223

Added line #L223 was not covered by tests
);
managers.add(subCollectorManager);
children.add(subCollectorManager);
}
CollectorManager<? extends Collector, ReduceableSearchResult> multiCollectorManager = QueryCollectorManagerContext
.createMultiCollectorManager(managers);
return new InternalProfileCollectorManager(multiCollectorManager, REASON_SEARCH_MULTI, children);

Check warning on line 230 in server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java#L225-L230

Added lines #L225 - L230 were not covered by tests
}

@Override
CollectorManager<? extends Collector, ReduceableSearchResult> createManager(
CollectorManager<? extends Collector, ReduceableSearchResult> in
Expand Down

0 comments on commit ee875a2

Please sign in to comment.