From fdb6fe50460558ffc5fdf298aec22499dd5456aa Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 20 Aug 2021 22:16:15 +0200 Subject: [PATCH] Remove Dead NamedWritableRegistry Fields in Aggs/Search Code (#76743) (#76774) No need for the registry in these places anymore => we can simplify things here and there. --- .../aggregations/TermsReduceBenchmark.java | 42 ++++++------------- .../search/QueryPhaseResultConsumer.java | 4 -- .../action/search/SearchPhaseController.java | 9 +--- .../search/TransportClearScrollAction.java | 5 +-- .../java/org/elasticsearch/node/Node.java | 3 +- .../action/search/DfsQueryPhaseTests.java | 2 +- .../action/search/FetchSearchPhaseTests.java | 18 +++----- .../search/QueryPhaseResultConsumerTests.java | 4 +- .../search/SearchPhaseControllerTests.java | 2 +- .../SearchQueryThenFetchAsyncActionTests.java | 24 ++++------- .../snapshots/SnapshotResiliencyTests.java | 5 +-- 11 files changed, 36 insertions(+), 82 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java index ea5134410e2d4..56ea700792c82 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java @@ -18,14 +18,11 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.BucketOrder; @@ -60,8 +57,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import static java.util.Collections.emptyList; - @Warmup(iterations = 5) @Measurement(iterations = 7) @BenchmarkMode(Mode.AverageTime) @@ -69,31 +64,21 @@ @State(Scope.Thread) @Fork(value = 1) public class TermsReduceBenchmark { - private final SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList()); - private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables()); - private final SearchPhaseController controller = new SearchPhaseController( - namedWriteableRegistry, - req -> new InternalAggregation.ReduceContextBuilder() { - @Override - public InternalAggregation.ReduceContext forPartialReduction() { - return InternalAggregation.ReduceContext.forPartialReduction(null, null, () -> PipelineAggregator.PipelineTree.EMPTY); - } + private final SearchPhaseController controller = new SearchPhaseController(req -> new InternalAggregation.ReduceContextBuilder() { + @Override + public InternalAggregation.ReduceContext forPartialReduction() { + return InternalAggregation.ReduceContext.forPartialReduction(null, null, () -> PipelineAggregator.PipelineTree.EMPTY); + } - @Override - public InternalAggregation.ReduceContext forFinalReduction() { - final MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer( - Integer.MAX_VALUE, - new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) - ); - return InternalAggregation.ReduceContext.forFinalReduction( - null, - null, - bucketConsumer, - PipelineAggregator.PipelineTree.EMPTY - ); - } + @Override + public InternalAggregation.ReduceContext forFinalReduction() { + final MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer( + Integer.MAX_VALUE, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + return InternalAggregation.ReduceContext.forFinalReduction(null, null, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY); } - ); + }); @State(Scope.Benchmark) public static class TermsList extends AbstractList { @@ -203,7 +188,6 @@ public SearchPhaseController.ReducedQueryPhase reduceAggs(TermsList candidateLis new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, SearchProgressListener.NOOP, - namedWriteableRegistry, shards.size(), exc -> {} ); diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index 5067e296569c7..53e9e2d4a24b1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.io.stream.DelayableWriteable; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; @@ -57,7 +56,6 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults onPartialMergeFailure) { super(expectedResultSize); @@ -85,7 +82,6 @@ public QueryPhaseResultConsumer(SearchRequest request, this.controller = controller; this.progressListener = progressListener; this.aggReduceContextBuilder = controller.getReduceContext(request); - this.namedWriteableRegistry = namedWriteableRegistry; this.topNSize = getTopDocsSize(request); this.performFinalReduce = request.isFinalReduce(); this.onPartialMergeFailure = onPartialMergeFailure; diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 983f4b288e2c6..31bbff1b22634 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -25,7 +25,6 @@ import org.apache.lucene.search.grouping.CollapseTopFieldDocs; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.collect.HppcMaps; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHit; @@ -63,12 +62,9 @@ public final class SearchPhaseController { private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0]; - private final NamedWriteableRegistry namedWriteableRegistry; private final Function requestToAggReduceContextBuilder; - public SearchPhaseController(NamedWriteableRegistry namedWriteableRegistry, - Function requestToAggReduceContextBuilder) { - this.namedWriteableRegistry = namedWriteableRegistry; + public SearchPhaseController(Function requestToAggReduceContextBuilder) { this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder; } @@ -597,8 +593,7 @@ QueryPhaseResultConsumer newSearchPhaseResults(Executor executor, SearchRequest request, int numShards, Consumer onPartialMergeFailure) { - return new QueryPhaseResultConsumer(request, executor, circuitBreaker, - this, listener, namedWriteableRegistry, numShards, onPartialMergeFailure); + return new QueryPhaseResultConsumer(request, executor, circuitBreaker, this, listener, numShards, onPartialMergeFailure); } static final class TopDocsStats { diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java index 5320dd1e4cd12..103d9930d0fde 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; @@ -21,15 +20,13 @@ public class TransportClearScrollAction extends HandledTransportAction InternalAggregationTestCase.emptyReduceContextBuilder()); + return new SearchPhaseController(request -> InternalAggregationTestCase.emptyReduceContextBuilder()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 5b5f588ce641f..c4edbd240fad7 100644 --- a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -37,8 +37,7 @@ public class FetchSearchPhaseTests extends ESTestCase { public void testShortcutQueryAndFetchOptimization() { - SearchPhaseController controller = new SearchPhaseController( - writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE, new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP, @@ -83,8 +82,7 @@ public void run() { public void testFetchTwoDocument() { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); - SearchPhaseController controller = new SearchPhaseController( - writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE, new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP, mockSearchPhaseContext.getRequest(), 2, exc -> {}); @@ -145,8 +143,7 @@ public void run() { public void testFailFetchOneDoc() { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); - SearchPhaseController controller = new SearchPhaseController( - writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE, new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP, mockSearchPhaseContext.getRequest(), 2, exc -> {}); @@ -209,8 +206,7 @@ public void testFetchDocsConcurrently() throws InterruptedException { int resultSetSize = randomIntBetween(0, 100); // we use at least 2 hits otherwise this is subject to single shard optimization and we trip an assert... int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard - SearchPhaseController controller = new SearchPhaseController( - writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits); QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE, new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP, @@ -268,8 +264,7 @@ public void run() { public void testExceptionFailsPhase() { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); - SearchPhaseController controller = new SearchPhaseController( - writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE, new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP, @@ -328,8 +323,7 @@ public void run() { public void testCleanupIrrelevantContexts() { // contexts that are not fetched should be cleaned up MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); - SearchPhaseController controller = new SearchPhaseController( - writableRegistry(), s -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController(s -> InternalAggregationTestCase.emptyReduceContextBuilder()); QueryPhaseResultConsumer results = controller.newSearchPhaseResults(EsExecutors.DIRECT_EXECUTOR_SERVICE, new NoopCircuitBreaker(CircuitBreaker.REQUEST), SearchProgressListener.NOOP, mockSearchPhaseContext.getRequest(), 2, exc -> {}); diff --git a/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java b/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java index e778af855f205..2246687b8941c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java @@ -47,7 +47,7 @@ public class QueryPhaseResultConsumerTests extends ESTestCase { @Before public void setup() { - searchPhaseController = new SearchPhaseController(writableRegistry(), + searchPhaseController = new SearchPhaseController( s -> new InternalAggregation.ReduceContextBuilder() { @Override public InternalAggregation.ReduceContext forPartialReduction() { @@ -85,7 +85,7 @@ public void testProgressListenerExceptionsAreCaught() throws Exception { AtomicReference onPartialMergeFailure = new AtomicReference<>(); QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer(searchRequest, executor, new NoopCircuitBreaker(CircuitBreaker.REQUEST), searchPhaseController, searchProgressListener, - writableRegistry(), 10, e -> onPartialMergeFailure.accumulateAndGet(e, (prev, curr) -> { + 10, e -> onPartialMergeFailure.accumulateAndGet(e, (prev, curr) -> { curr.addSuppressed(prev); return curr; })); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 44ef02055e3b4..94ef815c9885c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -106,7 +106,7 @@ protected NamedWriteableRegistry writableRegistry() { @Before public void setup() { reductions = new CopyOnWriteArrayList<>(); - searchPhaseController = new SearchPhaseController(writableRegistry(), s -> new InternalAggregation.ReduceContextBuilder() { + searchPhaseController = new SearchPhaseController(s -> new InternalAggregation.ReduceContextBuilder() { @Override public ReduceContext forPartialReduction() { reductions.add(false); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index dfafb3fda6797..57a77b9ccb814 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -145,12 +145,10 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest searchRequest.source().collapse(new CollapseBuilder("collapse_field")); } searchRequest.allowPartialSearchResults(false); - SearchPhaseController controller = new SearchPhaseController( - writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController(r -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(searchRequest, EsExecutors.DIRECT_EXECUTOR_SERVICE, - new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), - shardsIter.size(), exc -> {}); + new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), shardsIter.size(), exc -> {}); SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), @@ -243,12 +241,10 @@ private void testMixedVersionsShardsSearch(Version oldVersion, Version newVersio searchRequest.allowPartialSearchResults(false); SearchTransportService searchTransportService = new SearchTransportService(null, null, null); - SearchPhaseController controller = new SearchPhaseController( - writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController(r -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(searchRequest, EsExecutors.DIRECT_EXECUTOR_SERVICE, - new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), - shardsIter.size(), exc -> {}); + new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), shardsIter.size(), exc -> {}); final List responses = new ArrayList<>(); SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), @@ -338,12 +334,10 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest new Thread(() -> listener.onResponse(queryResult)).start(); } }; - SearchPhaseController controller = new SearchPhaseController( - writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController(r -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(searchRequest, EsExecutors.DIRECT_EXECUTOR_SERVICE, - new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), - shardsIter.size(), exc -> {}); + new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), shardsIter.size(), exc -> {}); CountDownLatch latch = new CountDownLatch(1); SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), @@ -438,12 +432,10 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest new Thread(() -> listener.onResponse(queryResult)).start(); } }; - SearchPhaseController controller = new SearchPhaseController( - writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchPhaseController controller = new SearchPhaseController(r -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(searchRequest, EsExecutors.DIRECT_EXECUTOR_SERVICE, - new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), - shardsIter.size(), exc -> {}); + new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), shardsIter.size(), exc -> {}); CountDownLatch latch = new CountDownLatch(1); SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 3f6040c4722f9..56edffa002fec 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1964,10 +1964,7 @@ protected void assertSnapshotOrGenericThread() { new NoneCircuitBreakerService(), EmptySystemIndices.INSTANCE.getExecutorSelector() ); - SearchPhaseController searchPhaseController = new SearchPhaseController( - writableRegistry(), - searchService::aggReduceContextBuilder - ); + SearchPhaseController searchPhaseController = new SearchPhaseController(searchService::aggReduceContextBuilder); actions.put( SearchAction.INSTANCE, new TransportSearchAction(