Skip to content

Commit

Permalink
Remove Dead NamedWritableRegistry Fields in Aggs/Search Code (#76743) (
Browse files Browse the repository at this point in the history
…#76774)

No need for the registry in these places anymore => we can simplify things here and there.
  • Loading branch information
original-brownbear authored Aug 20, 2021
1 parent 7e8ea78 commit fdb6fe5
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
Expand Down Expand Up @@ -60,40 +57,28 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;

@Warmup(iterations = 5)
@Measurement(iterations = 7)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Fork(value = 1)
public class TermsReduceBenchmark {
private final SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
private final SearchPhaseController controller = new SearchPhaseController(
namedWriteableRegistry,
req -> new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(null, null, () -> PipelineAggregator.PipelineTree.EMPTY);
}
private final SearchPhaseController controller = new SearchPhaseController(req -> new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(null, null, () -> PipelineAggregator.PipelineTree.EMPTY);
}

@Override
public InternalAggregation.ReduceContext forFinalReduction() {
final MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
Integer.MAX_VALUE,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
return InternalAggregation.ReduceContext.forFinalReduction(
null,
null,
bucketConsumer,
PipelineAggregator.PipelineTree.EMPTY
);
}
@Override
public InternalAggregation.ReduceContext forFinalReduction() {
final MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
Integer.MAX_VALUE,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
return InternalAggregation.ReduceContext.forFinalReduction(null, null, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY);
}
);
});

@State(Scope.Benchmark)
public static class TermsList extends AbstractList<InternalAggregations> {
Expand Down Expand Up @@ -203,7 +188,6 @@ public SearchPhaseController.ReducedQueryPhase reduceAggs(TermsList candidateLis
new NoopCircuitBreaker(CircuitBreaker.REQUEST),
controller,
SearchProgressListener.NOOP,
namedWriteableRegistry,
shards.size(),
exc -> {}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
Expand Down Expand Up @@ -57,7 +56,6 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
private final SearchPhaseController controller;
private final SearchProgressListener progressListener;
private final ReduceContextBuilder aggReduceContextBuilder;
private final NamedWriteableRegistry namedWriteableRegistry;

private final int topNSize;
private final boolean hasTopDocs;
Expand All @@ -76,7 +74,6 @@ public QueryPhaseResultConsumer(SearchRequest request,
CircuitBreaker circuitBreaker,
SearchPhaseController controller,
SearchProgressListener progressListener,
NamedWriteableRegistry namedWriteableRegistry,
int expectedResultSize,
Consumer<Exception> onPartialMergeFailure) {
super(expectedResultSize);
Expand All @@ -85,7 +82,6 @@ public QueryPhaseResultConsumer(SearchRequest request,
this.controller = controller;
this.progressListener = progressListener;
this.aggReduceContextBuilder = controller.getReduceContext(request);
this.namedWriteableRegistry = namedWriteableRegistry;
this.topNSize = getTopDocsSize(request);
this.performFinalReduce = request.isFinalReduce();
this.onPartialMergeFailure = onPartialMergeFailure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
Expand Down Expand Up @@ -63,12 +62,9 @@
public final class SearchPhaseController {
private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];

private final NamedWriteableRegistry namedWriteableRegistry;
private final Function<SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;

public SearchPhaseController(NamedWriteableRegistry namedWriteableRegistry,
Function<SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder) {
this.namedWriteableRegistry = namedWriteableRegistry;
public SearchPhaseController(Function<SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder) {
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
}

Expand Down Expand Up @@ -597,8 +593,7 @@ QueryPhaseResultConsumer newSearchPhaseResults(Executor executor,
SearchRequest request,
int numShards,
Consumer<Exception> onPartialMergeFailure) {
return new QueryPhaseResultConsumer(request, executor, circuitBreaker,
this, listener, namedWriteableRegistry, numShards, onPartialMergeFailure);
return new QueryPhaseResultConsumer(request, executor, circuitBreaker, this, listener, numShards, onPartialMergeFailure);
}

static final class TopDocsStats {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,20 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

public class TransportClearScrollAction extends HandledTransportAction<ClearScrollRequest, ClearScrollResponse> {

private final ClusterService clusterService;
private final SearchTransportService searchTransportService;
private final NamedWriteableRegistry namedWriteableRegistry;

@Inject
public TransportClearScrollAction(TransportService transportService, ClusterService clusterService, ActionFilters actionFilters,
SearchTransportService searchTransportService, NamedWriteableRegistry namedWriteableRegistry) {
SearchTransportService searchTransportService) {
super(ClearScrollAction.NAME, transportService, actionFilters, ClearScrollRequest::new);
this.clusterService = clusterService;
this.searchTransportService = searchTransportService;
this.namedWriteableRegistry = namedWriteableRegistry;
}

@Override
Expand Down
3 changes: 1 addition & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,7 @@ protected Node(final Environment initialEnvironment,
b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService);
b.bind(SearchService.class).toInstance(searchService);
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(
namedWriteableRegistry, searchService::aggReduceContextBuilder));
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::aggReduceContextBuilder));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,6 @@ public void run() throws IOException {
}

private SearchPhaseController searchPhaseController() {
return new SearchPhaseController(writableRegistry(), request -> InternalAggregationTestCase.emptyReduceContextBuilder());
return new SearchPhaseController(request -> InternalAggregationTestCase.emptyReduceContextBuilder());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
public class FetchSearchPhaseTests extends ESTestCase {

public void testShortcutQueryAndFetchOptimization() {
SearchPhaseController controller = new SearchPhaseController(
writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder());
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE,
new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP,
Expand Down Expand Up @@ -83,8 +82,7 @@ public void run() {

public void testFetchTwoDocument() {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(
writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder());
QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE,
new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP,
mockSearchPhaseContext.getRequest(), 2, exc -> {});
Expand Down Expand Up @@ -145,8 +143,7 @@ public void run() {

public void testFailFetchOneDoc() {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(
writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder());
QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE,
new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP,
mockSearchPhaseContext.getRequest(), 2, exc -> {});
Expand Down Expand Up @@ -209,8 +206,7 @@ public void testFetchDocsConcurrently() throws InterruptedException {
int resultSetSize = randomIntBetween(0, 100);
// we use at least 2 hits otherwise this is subject to single shard optimization and we trip an assert...
int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard
SearchPhaseController controller = new SearchPhaseController(
writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder());
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits);
QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE,
new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP,
Expand Down Expand Up @@ -268,8 +264,7 @@ public void run() {

public void testExceptionFailsPhase() {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(
writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder());
QueryPhaseResultConsumer results =
controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE,
new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP,
Expand Down Expand Up @@ -328,8 +323,7 @@ public void run() {

public void testCleanupIrrelevantContexts() { // contexts that are not fetched should be cleaned up
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(
writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder());
QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE,
new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP,
mockSearchPhaseContext.getRequest(), 2, exc -> {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class QueryPhaseResultConsumerTests extends ESTestCase {

@Before
public void setup() {
searchPhaseController = new SearchPhaseController(writableRegistry(),
searchPhaseController = new SearchPhaseController(
s -> new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
Expand Down Expand Up @@ -85,7 +85,7 @@ public void testProgressListenerExceptionsAreCaught() throws Exception {
AtomicReference<Exception> onPartialMergeFailure = new AtomicReference<>();
QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer(searchRequest, executor,
new NoopCircuitBreaker(CircuitBreaker.REQUEST), searchPhaseController, searchProgressListener,
writableRegistry(), 10, e -> onPartialMergeFailure.accumulateAndGet(e, (prev, curr) -> {
10, e -> onPartialMergeFailure.accumulateAndGet(e, (prev, curr) -> {
curr.addSuppressed(prev);
return curr;
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected NamedWriteableRegistry writableRegistry() {
@Before
public void setup() {
reductions = new CopyOnWriteArrayList<>();
searchPhaseController = new SearchPhaseController(writableRegistry(), s -> new InternalAggregation.ReduceContextBuilder() {
searchPhaseController = new SearchPhaseController(s -> new InternalAggregation.ReduceContextBuilder() {
@Override
public ReduceContext forPartialReduction() {
reductions.add(false);
Expand Down
Loading

0 comments on commit fdb6fe5

Please sign in to comment.