From ff6607d25f023a59f866a66820037bb215342ca8 Mon Sep 17 00:00:00 2001 From: Ishan Chattopadhyaya Date: Tue, 7 May 2024 00:19:37 +0530 Subject: [PATCH] SOLR-13350: Multithreaded search (closes #2248) --- solr/CHANGES.txt | 2 +- solr/bin/solr | 3 + .../org/apache/solr/core/CoreContainer.java | 15 + .../java/org/apache/solr/core/NodeConfig.java | 18 + .../org/apache/solr/core/SolrXmlConfig.java | 3 + .../handler/component/QueryComponent.java | 3 + .../solr/search/MultiThreadedSearcher.java | 423 ++++++++++++++++++ .../org/apache/solr/search/QueryCommand.java | 10 + .../apache/solr/search/SolrIndexSearcher.java | 134 ++++-- .../search/SolrMultiCollectorManager.java | 155 +++++++ solr/core/src/test-files/solr/solr-50-all.xml | 1 + .../src/test-files/solr/solr-stress-new.xml | 1 + .../solr/solr-trackingshardhandler.xml | 1 + solr/core/src/test-files/solr/solr.xml | 1 + .../solr/search/TestCpuAllowedLimit.java | 8 +- .../org/apache/solr/search/TestFiltering.java | 18 +- .../apache/solr/search/TestQueryLimits.java | 12 +- solr/server/solr/solr.xml | 1 + .../pages/configuring-solr-xml.adoc | 9 + 19 files changed, 772 insertions(+), 46 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/search/MultiThreadedSearcher.java create mode 100644 solr/core/src/java/org/apache/solr/search/SolrMultiCollectorManager.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 3251785592d..8d4c0e5d30f 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -98,7 +98,7 @@ Other Changes ================== 9.7.0 ================== New Features --------------------- -(No changes) +* SOLR-13350: Multithreaded search execution (Ishan Chattopadhyaya, Mark Miller, Christine Poerschke, David Smiley, noble) Improvements --------------------- diff --git a/solr/bin/solr b/solr/bin/solr index 69686f6e286..a2b9445604c 100755 --- a/solr/bin/solr +++ b/solr/bin/solr @@ -1441,6 +1441,9 @@ if [ $# -gt 0 ]; then done fi +# Setting number of threads for search +if ! command -v nproc &> /dev/null; then echo "Couldn't determine number of CPUs, using default number of search threads"; else cpus=`nproc`; SCRIPT_SOLR_OPTS+="-Dsolr.searchThreads=$cpus"; fi + # Default placement plugin if [[ -n "${SOLR_PLACEMENTPLUGIN_DEFAULT:-}" ]] ; then SCRIPT_SOLR_OPTS+=("-Dsolr.placementplugin.default=$SOLR_PLACEMENTPLUGIN_DEFAULT") diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index b0489450f86..ac27230d548 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -49,6 +49,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -175,6 +176,10 @@ public class CoreContainer { final SolrCores solrCores; + public Executor getCollectorExecutor() { + return collectorExecutor; + } + public static class CoreLoadFailure { public final CoreDescriptor cd; @@ -278,6 +283,8 @@ public JerseyAppHandlerCache getJerseyAppHandlerCache() { public final NodeRoles nodeRoles = new NodeRoles(System.getProperty(NodeRoles.NODE_ROLES_PROP)); + private final ExecutorService collectorExecutor; + private final ClusterSingletons clusterSingletons = new ClusterSingletons( () -> @@ -432,6 +439,12 @@ public CoreContainer(NodeConfig config, CoresLocator locator, boolean asyncSolrC this.allowPaths = allowPathBuilder.build(); this.allowListUrlChecker = AllowListUrlChecker.create(config); + + this.collectorExecutor = + ExecutorUtil.newMDCAwareCachedThreadPool( + cfg.getIndexSearcherExecutorThreads(), // thread count + cfg.getIndexSearcherExecutorThreads() * 1000, // queue size + new SolrNamedThreadFactory("searcherCollector")); } @SuppressWarnings({"unchecked"}) @@ -657,6 +670,7 @@ protected CoreContainer(Object testConstructor) { distributedCollectionCommandRunner = Optional.empty(); allowPaths = null; allowListUrlChecker = null; + collectorExecutor = null; } public static CoreContainer createAndLoad(Path solrHome) { @@ -1248,6 +1262,7 @@ public void shutdown() { } ExecutorUtil.shutdownAndAwaitTermination(coreContainerAsyncTaskExecutor); + ExecutorUtil.shutdownAndAwaitTermination(collectorExecutor); ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("closeThreadPool")); diff --git a/solr/core/src/java/org/apache/solr/core/NodeConfig.java b/solr/core/src/java/org/apache/solr/core/NodeConfig.java index ef1cbbf2dfd..e6b4a4fe7e2 100644 --- a/solr/core/src/java/org/apache/solr/core/NodeConfig.java +++ b/solr/core/src/java/org/apache/solr/core/NodeConfig.java @@ -105,6 +105,8 @@ public class NodeConfig { private final int replayUpdatesThreads; + private final int indexSearcherExecutorThreads; + @Deprecated private final int transientCacheSize; private final boolean useSchemaCache; @@ -144,6 +146,7 @@ private NodeConfig( CloudConfig cloudConfig, Integer coreLoadThreads, int replayUpdatesThreads, + int indexSearcherExecutorThreads, int transientCacheSize, boolean useSchemaCache, String managementPath, @@ -183,6 +186,7 @@ private NodeConfig( this.cloudConfig = cloudConfig; this.coreLoadThreads = coreLoadThreads; this.replayUpdatesThreads = replayUpdatesThreads; + this.indexSearcherExecutorThreads = indexSearcherExecutorThreads; this.transientCacheSize = transientCacheSize; this.useSchemaCache = useSchemaCache; this.managementPath = managementPath; @@ -335,6 +339,10 @@ public int getReplayUpdatesThreads() { return replayUpdatesThreads; } + public int getIndexSearcherExecutorThreads() { + return indexSearcherExecutorThreads; + } + /** * Returns a directory, optionally a comma separated list of directories that will be added to * Solr's class path for searching for classes and plugins. The path is either absolute or @@ -597,6 +605,7 @@ public static class NodeConfigBuilder { private CloudConfig cloudConfig; private int coreLoadThreads = DEFAULT_CORE_LOAD_THREADS; private int replayUpdatesThreads = Runtime.getRuntime().availableProcessors(); + private int indexSearcherExecutorThreads = DEFAULT_INDEX_SEARCHER_EXECUTOR_THREADS; @Deprecated private int transientCacheSize = -1; private boolean useSchemaCache = false; private String managementPath; @@ -618,6 +627,9 @@ public static class NodeConfigBuilder { // No:of core load threads in cloud mode is set to a default of 8 public static final int DEFAULT_CORE_LOAD_THREADS_IN_CLOUD = 8; + public static final int DEFAULT_INDEX_SEARCHER_EXECUTOR_THREADS = + 4; + private static final String DEFAULT_CORESLOCATORCLASS = "org.apache.solr.core.CorePropertiesLocator"; private static final String DEFAULT_CORESORTERCLASS = "org.apache.solr.core.CoreSorter"; @@ -755,6 +767,11 @@ public NodeConfigBuilder setReplayUpdatesThreads(int replayUpdatesThreads) { return this; } + public NodeConfigBuilder setIndexSearcherExecutorThreads(int indexSearcherExecutorThreads) { + this.indexSearcherExecutorThreads = indexSearcherExecutorThreads; + return this; + } + // Remove in Solr 10.0 @Deprecated @@ -904,6 +921,7 @@ public NodeConfig build() { cloudConfig, coreLoadThreads, replayUpdatesThreads, + indexSearcherExecutorThreads, transientCacheSize, useSchemaCache, managementPath, diff --git a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java index 7f91d890013..b19d65320ad 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java +++ b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java @@ -380,6 +380,9 @@ private static NodeConfig fillSolrSection(NodeConfig.NodeConfigBuilder builder, case "replayUpdatesThreads": builder.setReplayUpdatesThreads(it.intVal(-1)); break; + case "indexSearcherExecutorThreads": + builder.setIndexSearcherExecutorThreads(it.intVal(-1)); + break; case "transientCacheSize": log.warn("solr.xml transientCacheSize -- transient cores is deprecated"); builder.setTransientCacheSize(it.intVal(-1)); diff --git a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java index 4f17cb7652b..4bc2a867ab2 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java @@ -372,10 +372,13 @@ public void process(ResponseBuilder rb) throws IOException { return; } + final boolean multiThreaded = params.getBool("multiThreaded", true); + // -1 as flag if not set. long timeAllowed = params.getLong(CommonParams.TIME_ALLOWED, -1L); QueryCommand cmd = rb.createQueryCommand(); + cmd.setMultiThreaded(multiThreaded); cmd.setTimeAllowed(timeAllowed); cmd.setMinExactCount(getMinExactCount(params)); cmd.setDistribStatsDisabled(rb.isDistribStatsDisabled()); diff --git a/solr/core/src/java/org/apache/solr/search/MultiThreadedSearcher.java b/solr/core/src/java/org/apache/solr/search/MultiThreadedSearcher.java new file mode 100644 index 00000000000..f9810b1a083 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/search/MultiThreadedSearcher.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.search; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.SimpleCollector; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopDocsCollector; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.automaton.ByteRunAutomaton; +import org.apache.solr.search.join.GraphQuery; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MultiThreadedSearcher { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + final SolrIndexSearcher searcher; + + public MultiThreadedSearcher(SolrIndexSearcher searcher) { + this.searcher = searcher; + } + + SearchResult searchCollectorManagers( + int len, + QueryCommand cmd, + Query query, + boolean needTopDocs, + boolean needMaxScore, + boolean needDocSet) + throws IOException { + Collection> collectors = new ArrayList<>(); + + int firstCollectorsSize = 0; + + final int firstTopDocsCollectorIndex; + if (needTopDocs) { + firstTopDocsCollectorIndex = firstCollectorsSize; + firstCollectorsSize++; + } else { + firstTopDocsCollectorIndex = -1; + } + + final int firstMaxScoreCollectorIndex; + if (needMaxScore) { + firstMaxScoreCollectorIndex = firstCollectorsSize; + firstCollectorsSize++; + } else { + firstMaxScoreCollectorIndex = -1; + } + + Collector[] firstCollectors = new Collector[firstCollectorsSize]; + + if (needTopDocs) { + + collectors.add(new TopDocsCM(len, cmd, firstCollectors, firstTopDocsCollectorIndex)); + } + if (needMaxScore) { + collectors.add(new MaxScoreCM(firstCollectors, firstMaxScoreCollectorIndex)); + } + if (needDocSet) { + int maxDoc = searcher.getRawReader().maxDoc(); + log.error("raw read max={}", searcher.getRawReader().maxDoc()); + + collectors.add(new DocSetCM(maxDoc)); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorManager[] colls = collectors.toArray(new CollectorManager[0]); + SolrMultiCollectorManager manager = new SolrMultiCollectorManager(colls); + Object[] ret; + try { + ret = searcher.search(query, manager); + } catch (Exception ex) { + if (ex instanceof RuntimeException + && ex.getCause() != null + && ex.getCause() instanceof ExecutionException + && ex.getCause().getCause() != null + && ex.getCause().getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause().getCause(); + } else { + throw ex; + } + } + + ScoreMode scoreMode = SolrMultiCollectorManager.scoreMode(firstCollectors); + + return new SearchResult(scoreMode, ret); + } + + static boolean allowMT(DelegatingCollector postFilter, QueryCommand cmd, Query query) { + if (postFilter != null + || cmd.getSegmentTerminateEarly() + || cmd.getTimeAllowed() > 0 + || !cmd.getMultiThreaded()) { + return false; + } else { + MTCollectorQueryCheck allowMT = new MTCollectorQueryCheck(); + query.visit(allowMT); + return allowMT.allowed(); + } + } + + /** + * A {@link QueryVisitor} that recurses through the query tree, determining if all queries support + * multi-threaded collecting. + */ + private static class MTCollectorQueryCheck extends QueryVisitor { + + private QueryVisitor subVisitor = this; + + private boolean allowMt(Query query) { + if (query instanceof RankQuery || query instanceof GraphQuery || query instanceof JoinQuery) { + return false; + } + return true; + } + + @Override + public void consumeTerms(Query query, Term... terms) { + if (!allowMt(query)) { + subVisitor = EMPTY_VISITOR; + } + } + + @Override + public void consumeTermsMatching( + Query query, String field, Supplier automaton) { + if (!allowMt(query)) { + subVisitor = EMPTY_VISITOR; + } else { + super.consumeTermsMatching(query, field, automaton); + } + } + + @Override + public void visitLeaf(Query query) { + if (!allowMt(query)) { + subVisitor = EMPTY_VISITOR; + } + } + + @Override + public QueryVisitor getSubVisitor(BooleanClause.Occur occur, Query parent) { + return subVisitor; + } + + public boolean allowed() { + return subVisitor != EMPTY_VISITOR; + } + } + + static class MaxScoreResult { + final float maxScore; + + public MaxScoreResult(float maxScore) { + this.maxScore = maxScore; + } + } + + static class FixedBitSetCollector extends SimpleCollector { + @SuppressWarnings("JdkObsolete") + private final LinkedList bitSets = new LinkedList<>(); + + @SuppressWarnings("JdkObsolete") + private final LinkedList skipWords = new LinkedList<>(); + + @SuppressWarnings("JdkObsolete") + private final LinkedList skipBits = new LinkedList<>(); + + FixedBitSetCollector() {} + + @Override + protected void doSetNextReader(LeafReaderContext context) throws IOException { + this.bitSets.add(null); // lazy allocate when collecting document(s) + this.skipWords.add(context.docBase / 64); + this.skipBits.add(context.docBase % 64); + } + + @Override + public void collect(int doc) throws IOException { + FixedBitSet bitSet = this.bitSets.getLast(); + final int idx = this.skipBits.getLast() + doc; + + final int numWords = FixedBitSet.bits2words(idx + 1); // +1 to ensure minimum 1 word + + if (bitSet == null) { + this.bitSets.removeLast(); + bitSet = new FixedBitSet(numWords * 64); + this.bitSets.addLast(bitSet); + + } else if (bitSet.getBits().length < numWords) { + FixedBitSet smallerBitSet = this.bitSets.removeLast(); + bitSet = new FixedBitSet(numWords * 64); + bitSet.xor(smallerBitSet); + this.bitSets.addLast(bitSet); + } + + bitSet.set(idx); + } + + void update(FixedBitSet allBitSet) { + final long[] allBits = allBitSet.getBits(); + for (int bs_idx = 0; bs_idx < this.bitSets.size(); ++bs_idx) { + final FixedBitSet itBitSet = this.bitSets.get(bs_idx); + if (itBitSet != null) { + final int skipWords = this.skipWords.get(bs_idx); + final long[] itBits = itBitSet.getBits(); + for (int idx = 0; idx < itBits.length && skipWords + idx < allBits.length; ++idx) { + allBits[skipWords + idx] ^= itBits[idx]; + } + } + } + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + } + + static class SearchResult { + final ScoreMode scoreMode; + private final Object[] result; + + public SearchResult(ScoreMode scoreMode, Object[] result) { + this.scoreMode = scoreMode; + this.result = result; + } + + public TopDocsResult getTopDocsResult() { + for (Object res : result) { + if (res instanceof TopDocsResult) { + return (TopDocsResult) res; + } + } + return null; + } + + public float getMaxScore(int totalHits) { + if (totalHits > 0) { + for (Object res : result) { + if (res instanceof MaxScoreResult) { + return ((MaxScoreResult) res).maxScore; + } + } + return Float.NaN; + } else { + return 0.0f; + } + } + + public FixedBitSet getFixedBitSet() { + for (Object res : result) { + if (res instanceof FixedBitSet) { + return (FixedBitSet) res; + } + } + return null; + } + } + + private static class MaxScoreCM implements CollectorManager { + private final Collector[] firstCollectors; + private final int firstMaxScoreCollectorIndex; + + public MaxScoreCM(Collector[] firstCollectors, int firstMaxScoreCollectorIndex) { + this.firstCollectors = firstCollectors; + this.firstMaxScoreCollectorIndex = firstMaxScoreCollectorIndex; + } + + @Override + public Collector newCollector() throws IOException { + MaxScoreCollector collector = new MaxScoreCollector(); + if (firstCollectors[firstMaxScoreCollectorIndex] == null) { + firstCollectors[firstMaxScoreCollectorIndex] = collector; + } + return collector; + } + + @Override + @SuppressWarnings("rawtypes") + public Object reduce(Collection collectors) throws IOException { + + MaxScoreCollector collector; + float maxScore = 0.0f; + for (Iterator var4 = collectors.iterator(); + var4.hasNext(); + maxScore = Math.max(maxScore, collector.getMaxScore())) { + collector = (MaxScoreCollector) var4.next(); + } + + return new MaxScoreResult(maxScore); + } + } + + private static class DocSetCM implements CollectorManager { + private final int maxDoc; + + public DocSetCM(int maxDoc) { + this.maxDoc = maxDoc; + } + + @Override + public Collector newCollector() throws IOException { + // TODO: add to firstCollectors here? or if not have comment w.r.t. why not adding + return new FixedBitSetCollector(); + } + + @Override + @SuppressWarnings({"rawtypes"}) + public Object reduce(Collection collectors) throws IOException { + final FixedBitSet reduced = new FixedBitSet(maxDoc); + for (Object collector : collectors) { + if (collector instanceof FixedBitSetCollector) { + FixedBitSetCollector fixedBitSetCollector = (FixedBitSetCollector) collector; + fixedBitSetCollector.update(reduced); + } + } + return reduced; + } + } + + private class TopDocsCM implements CollectorManager { + private final int len; + private final QueryCommand cmd; + private final Collector[] firstCollectors; + private final int firstTopDocsCollectorIndex; + + public TopDocsCM( + int len, QueryCommand cmd, Collector[] firstCollectors, int firstTopDocsCollectorIndex) { + this.len = len; + this.cmd = cmd; + this.firstCollectors = firstCollectors; + this.firstTopDocsCollectorIndex = firstTopDocsCollectorIndex; + } + + @Override + public Collector newCollector() throws IOException { + @SuppressWarnings("rawtypes") + TopDocsCollector collector = searcher.buildTopDocsCollector(len, cmd); + if (firstCollectors[firstTopDocsCollectorIndex] == null) { + firstCollectors[firstTopDocsCollectorIndex] = collector; + } + return collector; + } + + @Override + @SuppressWarnings("rawtypes") + public Object reduce(Collection collectors) throws IOException { + + TopDocs[] topDocs = new TopDocs[collectors.size()]; + + int totalHits = -1; + int i = 0; + + Collector collector; + for (Object o : collectors) { + collector = (Collector) o; + if (collector instanceof TopDocsCollector) { + TopDocs td = ((TopDocsCollector) collector).topDocs(0, len); + assert td != null : Arrays.asList(topDocs); + topDocs[i++] = td; + } + } + + TopDocs mergedTopDocs = null; + + if (topDocs.length > 0 && topDocs[0] != null) { + if (topDocs[0] instanceof TopFieldDocs) { + TopFieldDocs[] topFieldDocs = + Arrays.copyOf(topDocs, topDocs.length, TopFieldDocs[].class); + mergedTopDocs = TopFieldDocs.merge(searcher.weightSort(cmd.getSort()), len, topFieldDocs); + } else { + mergedTopDocs = TopDocs.merge(0, len, topDocs); + } + totalHits = (int) mergedTopDocs.totalHits.value; + } + return new TopDocsResult(mergedTopDocs, totalHits); + } + } + + static class TopDocsResult { + final TopDocs topDocs; + final int totalHits; + + public TopDocsResult(TopDocs topDocs, int totalHits) { + this.topDocs = topDocs; + this.totalHits = totalHits; + } + } +} diff --git a/solr/core/src/java/org/apache/solr/search/QueryCommand.java b/solr/core/src/java/org/apache/solr/search/QueryCommand.java index 7ed6b6b9473..0622bb76567 100755 --- a/solr/core/src/java/org/apache/solr/search/QueryCommand.java +++ b/solr/core/src/java/org/apache/solr/search/QueryCommand.java @@ -36,6 +36,7 @@ public class QueryCommand { private int len; private int supersetMaxDoc; private int flags; + private boolean multiThreaded = false; private long timeAllowed = -1; private int minExactCount = Integer.MAX_VALUE; private CursorMark cursorMark; @@ -152,6 +153,15 @@ public QueryCommand clearFlags(int flags) { return this; } + public boolean getMultiThreaded() { + return multiThreaded; + } + + public QueryCommand setMultiThreaded(boolean multiThreaded) { + this.multiThreaded = multiThreaded; + return this; + } + public long getTimeAllowed() { return timeAllowed; } diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java index 3b4ce7e9e6d..1f589d17415 100644 --- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java +++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java @@ -335,7 +335,7 @@ public SolrIndexSearcher( boolean reserveDirectory, DirectoryFactory directoryFactory) throws IOException { - super(wrapReader(core, r)); + super(wrapReader(core, r), core.getCoreContainer().getCollectorExecutor()); this.path = path; this.directoryFactory = directoryFactory; @@ -1793,7 +1793,7 @@ private void populateNextCursorMarkFromTopDocs(QueryResult qr, QueryCommand qc, * @param len the number of docs to return * @param cmd The Command whose properties should determine the type of TopDocsCollector to use. */ - private TopDocsCollector buildTopDocsCollector(int len, QueryCommand cmd) + TopDocsCollector buildTopDocsCollector(int len, QueryCommand cmd) throws IOException { int minNumFound = cmd.getMinExactCount(); Query q = cmd.getQuery(); @@ -1820,11 +1820,11 @@ private void getDocListNC(QueryResult qr, QueryCommand cmd) throws IOException { int last = len; if (last < 0 || last > maxDoc()) last = maxDoc(); final int lastDocRequested = last; - final int nDocsReturned; - final int totalHits; - final float maxScore; - final int[] ids; - final float[] scores; + int nDocsReturned = 0; + int totalHits; + float maxScore; + int[] ids; + float[] scores; final boolean needScores = (cmd.getFlags() & GET_SCORES) != 0; @@ -1879,7 +1879,6 @@ public ScoreMode scoreMode() { buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter); - nDocsReturned = 0; ids = new int[nDocsReturned]; scores = new float[nDocsReturned]; totalHits = numHits[0]; @@ -1888,25 +1887,51 @@ public ScoreMode scoreMode() { qr.setNextCursorMark(cmd.getCursorMark()); hitsRelation = Relation.EQUAL_TO; } else { - final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd); - MaxScoreCollector maxScoreCollector = null; - Collector collector = topCollector; - if (needScores) { - maxScoreCollector = new MaxScoreCollector(); - collector = MultiCollector.wrap(topCollector, maxScoreCollector); + if (log.isInfoEnabled()) { + log.info("calling from 2, query: {}", query.getClass()); + } + final TopDocs topDocs; + final ScoreMode scoreModeUsed; + if (!MultiThreadedSearcher.allowMT(pf.postFilter, cmd, query)) { + if (log.isInfoEnabled()) { + log.info("skipping collector manager"); + } + final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd); + MaxScoreCollector maxScoreCollector = null; + Collector collector = topCollector; + if (needScores) { + maxScoreCollector = new MaxScoreCollector(); + collector = MultiCollector.wrap(topCollector, maxScoreCollector); + } + scoreModeUsed = + buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter).scoreMode(); + + totalHits = topCollector.getTotalHits(); + topDocs = topCollector.topDocs(0, len); + + maxScore = + totalHits > 0 + ? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore()) + : 0.0f; + } else { + if (log.isInfoEnabled()) { + log.info("using CollectorManager"); + } + final MultiThreadedSearcher.SearchResult searchResult = + new MultiThreadedSearcher(this) + .searchCollectorManagers(len, cmd, query, true, needScores, false); + scoreModeUsed = searchResult.scoreMode; + + MultiThreadedSearcher.TopDocsResult topDocsResult = searchResult.getTopDocsResult(); + totalHits = topDocsResult.totalHits; + topDocs = topDocsResult.topDocs; + + maxScore = searchResult.getMaxScore(totalHits); } - final ScoreMode scoreModeUsed = - buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter).scoreMode(); - totalHits = topCollector.getTotalHits(); - final TopDocs topDocs = topCollector.topDocs(0, len); hitsRelation = populateScoresIfNeeded(cmd, needScores, topDocs, query, scoreModeUsed); populateNextCursorMarkFromTopDocs(qr, cmd, topDocs); - maxScore = - totalHits > 0 - ? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore()) - : 0.0f; nDocsReturned = topDocs.scoreDocs.length; ids = new int[nDocsReturned]; scores = needScores ? new float[nDocsReturned] : null; @@ -1991,32 +2016,61 @@ public ScoreMode scoreMode() { // no docs on this page, so cursor doesn't change qr.setNextCursorMark(cmd.getCursorMark()); } else { - final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd); - final DocSetCollector setCollector = new DocSetCollector(maxDoc); - MaxScoreCollector maxScoreCollector = null; - List collectors = new ArrayList<>(Arrays.asList(topCollector, setCollector)); + final TopDocs topDocs; + if (!MultiThreadedSearcher.allowMT(pf.postFilter, cmd, query)) { + @SuppressWarnings({"rawtypes"}) + final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd); + final DocSetCollector setCollector = new DocSetCollector(maxDoc); + MaxScoreCollector maxScoreCollector = null; + List collectors = new ArrayList<>(Arrays.asList(topCollector, setCollector)); + + if (needScores) { + maxScoreCollector = new MaxScoreCollector(); + collectors.add(maxScoreCollector); + } - if (needScores) { - maxScoreCollector = new MaxScoreCollector(); - collectors.add(maxScoreCollector); - } + Collector collector = MultiCollector.wrap(collectors); - final Collector collector = MultiCollector.wrap(collectors); + buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter); - buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter); + set = DocSetUtil.getDocSet(setCollector, this); - set = DocSetUtil.getDocSet(setCollector, this); + totalHits = topCollector.getTotalHits(); + assert (totalHits == set.size()) || qr.isPartialResults(); - totalHits = topCollector.getTotalHits(); - assert (totalHits == set.size()) || qr.isPartialResults(); + topDocs = topCollector.topDocs(0, len); + maxScore = + totalHits > 0 + ? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore()) + : 0.0f; + } else { + log.debug("using CollectorManager"); + + boolean needMaxScore = needScores; + MultiThreadedSearcher.SearchResult searchResult = + new MultiThreadedSearcher(this) + .searchCollectorManagers(len, cmd, query, true, needMaxScore, true); + MultiThreadedSearcher.TopDocsResult topDocsResult = searchResult.getTopDocsResult(); + totalHits = topDocsResult.totalHits; + topDocs = topDocsResult.topDocs; + maxScore = searchResult.getMaxScore(totalHits); + set = new BitDocSet(searchResult.getFixedBitSet()); + + // TODO: Is this correct? + // hitsRelation = populateScoresIfNeeded(cmd, needScores, topDocs, query, + // searchResult.scoreMode); + + // nDocsReturned = topDocs.scoreDocs.length; + // TODO: Is this correct? + // hitsRelation = topDocs.totalHits.relation; + // } else { + // hitsRelation = Relation.EQUAL_TO; + // } + + } - final TopDocs topDocs = topCollector.topDocs(0, len); populateScoresIfNeeded(cmd, needScores, topDocs, query, ScoreMode.COMPLETE); populateNextCursorMarkFromTopDocs(qr, cmd, topDocs); - maxScore = - totalHits > 0 - ? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore()) - : 0.0f; nDocsReturned = topDocs.scoreDocs.length; ids = new int[nDocsReturned]; diff --git a/solr/core/src/java/org/apache/solr/search/SolrMultiCollectorManager.java b/solr/core/src/java/org/apache/solr/search/SolrMultiCollectorManager.java new file mode 100644 index 00000000000..e2b7e6fe7af --- /dev/null +++ b/solr/core/src/java/org/apache/solr/search/SolrMultiCollectorManager.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.search; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.FilterScorable; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.MultiCollector; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreMode; + +/** + * A {@link CollectorManager} implements which wrap a set of {@link CollectorManager} as {@link + * MultiCollector} acts for {@link Collector}. + */ +public class SolrMultiCollectorManager + implements CollectorManager { + + private final CollectorManager[] collectorManagers; + + @SafeVarargs + @SuppressWarnings({"varargs", "unchecked"}) + public SolrMultiCollectorManager( + final CollectorManager... collectorManagers) { + if (collectorManagers.length < 1) { + throw new IllegalArgumentException("There must be at least one collector"); + } + this.collectorManagers = (CollectorManager[]) collectorManagers; + } + + @Override + public Collectors newCollector() throws IOException { + return new Collectors(); + } + + @Override + public Object[] reduce(Collection reducableCollectors) throws IOException { + final int size = reducableCollectors.size(); + final Object[] results = new Object[collectorManagers.length]; + for (int i = 0; i < collectorManagers.length; i++) { + final List reducableCollector = new ArrayList<>(size); + for (Collectors collectors : reducableCollectors) + reducableCollector.add(collectors.collectors[i]); + results[i] = collectorManagers[i].reduce(reducableCollector); + } + return results; + } + + // TODO: could Lucene's MultiCollector permit reuse of its logic? + public static ScoreMode scoreMode(Collector[] collectors) { + ScoreMode scoreMode = null; + for (Collector collector : collectors) { + if (scoreMode == null) { + scoreMode = collector.scoreMode(); + } else if (scoreMode != collector.scoreMode()) { + return ScoreMode.COMPLETE; + } + } + return scoreMode; + } + + /** Wraps multiple collectors for processing */ + class Collectors implements Collector { + + private final Collector[] collectors; + + private Collectors() throws IOException { + collectors = new Collector[collectorManagers.length]; + for (int i = 0; i < collectors.length; i++) + collectors[i] = collectorManagers[i].newCollector(); + } + + @Override + public final LeafCollector getLeafCollector(final LeafReaderContext context) + throws IOException { + return new LeafCollectors(context, scoreMode() == ScoreMode.TOP_SCORES); + } + + @Override + public final ScoreMode scoreMode() { + return SolrMultiCollectorManager.scoreMode(collectors); + } + + /** + * Wraps multiple leaf collectors and delegates collection across each one + * + * @lucene.internal + */ + private class LeafCollectors implements LeafCollector { + + private final LeafCollector[] leafCollectors; + private final boolean skipNonCompetitiveScores; + + private LeafCollectors(final LeafReaderContext context, boolean skipNonCompetitiveScores) + throws IOException { + this.skipNonCompetitiveScores = skipNonCompetitiveScores; + leafCollectors = new LeafCollector[collectors.length]; + for (int i = 0; i < collectors.length; i++) + leafCollectors[i] = collectors[i].getLeafCollector(context); + } + + @Override + public final void setScorer(final Scorable scorer) throws IOException { + if (skipNonCompetitiveScores) { + for (LeafCollector leafCollector : leafCollectors) + if (leafCollector != null) leafCollector.setScorer(scorer); + } else { + FilterScorable fScorer = + new FilterScorable(scorer) { + @Override + public void setMinCompetitiveScore(float minScore) throws IOException { + // Ignore calls to setMinCompetitiveScore so that if we wrap two + // collectors and one of them wants to skip low-scoring hits, then + // the other collector still sees all hits. + } + }; + for (LeafCollector leafCollector : leafCollectors) { + if (leafCollector != null) { + leafCollector.setScorer(fScorer); + } + } + } + } + + @Override + public final void collect(final int doc) throws IOException { + for (LeafCollector leafCollector : leafCollectors) { + if (leafCollector != null) { + leafCollector.collect(doc); + } + } + } + } + } +} diff --git a/solr/core/src/test-files/solr/solr-50-all.xml b/solr/core/src/test-files/solr/solr-50-all.xml index efa2abd4e6a..53a682050ca 100644 --- a/solr/core/src/test-files/solr/solr-50-all.xml +++ b/solr/core/src/test-files/solr/solr-50-all.xml @@ -30,6 +30,7 @@ testCoreSorter 66 100 + 7 42 true diff --git a/solr/core/src/test-files/solr/solr-stress-new.xml b/solr/core/src/test-files/solr/solr-stress-new.xml index 434ef91067d..2294e39ffd7 100644 --- a/solr/core/src/test-files/solr/solr-stress-new.xml +++ b/solr/core/src/test-files/solr/solr-stress-new.xml @@ -21,6 +21,7 @@ --> + 4 127.0.0.1 8983 diff --git a/solr/core/src/test-files/solr/solr-trackingshardhandler.xml b/solr/core/src/test-files/solr/solr-trackingshardhandler.xml index 201177105ad..532a03aacd7 100644 --- a/solr/core/src/test-files/solr/solr-trackingshardhandler.xml +++ b/solr/core/src/test-files/solr/solr-trackingshardhandler.xml @@ -24,6 +24,7 @@ ${shareSchema:false} ${configSetBaseDir:configsets} ${coreRootDirectory:.} + 4 127.0.0.1 diff --git a/solr/core/src/test-files/solr/solr.xml b/solr/core/src/test-files/solr/solr.xml index b2606a44c6d..9b34042f256 100644 --- a/solr/core/src/test-files/solr/solr.xml +++ b/solr/core/src/test-files/solr/solr.xml @@ -29,6 +29,7 @@ ${solr.allowPaths:} ${solr.tests.allowUrls:} ${solr.hideStackTrace:true} + 4 ${urlScheme:} diff --git a/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java b/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java index dac4ec5124c..ea02e448da8 100644 --- a/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java +++ b/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java @@ -185,7 +185,9 @@ public void testDistribLimit() throws Exception { "stages", "prepare,process", "cpuAllowed", - "50")); + "50", + "multiThreaded", + "false")); // System.err.println("rsp=" + rsp.jsonStr()); assertNotNull("should have partial results", rsp.getHeader().get("partialResults")); @@ -204,7 +206,9 @@ public void testDistribLimit() throws Exception { "stages", "prepare,process", "cpuAllowed", - "50")); + "50", + "multiThreaded", + "false")); // System.err.println("rsp=" + rsp.jsonStr()); assertNotNull("should have partial results", rsp.getHeader().get("partialResults")); } diff --git a/solr/core/src/test/org/apache/solr/search/TestFiltering.java b/solr/core/src/test/org/apache/solr/search/TestFiltering.java index 5d9ad6bb9da..e9194e170f9 100644 --- a/solr/core/src/test/org/apache/solr/search/TestFiltering.java +++ b/solr/core/src/test/org/apache/solr/search/TestFiltering.java @@ -22,6 +22,7 @@ import java.util.Locale; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Query; +import org.apache.lucene.util.Bits; import org.apache.lucene.util.FixedBitSet; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.SolrInputDocument; @@ -92,14 +93,14 @@ public void testLiveDocsSharing() throws Exception { QueryResult res = new QueryResult(); searcher.search(res, cmd); set = res.getDocSet(); - assertSame(set, live); + assertEffectivelySame(set.getFixedBitSet(), live.getFixedBitSet()); cmd.setQuery(QParser.getParser(qstr + " OR id:0", null, req).getQuery()); cmd.setFilterList(QParser.getParser(qstr + " OR id:1", null, req).getQuery()); res = new QueryResult(); searcher.search(res, cmd); set = res.getDocSet(); - assertSame(set, live); + assertEffectivelySame(set.getFixedBitSet(), live.getFixedBitSet()); } } finally { @@ -107,6 +108,19 @@ public void testLiveDocsSharing() throws Exception { } } + /** If the a XOR b == 0, then both a & b are effectively the same bitset */ + private void assertEffectivelySame(FixedBitSet a, FixedBitSet b) { + FixedBitSet xor = a.clone(); + xor.xor(b); + assertEquals(new FixedBitSet(xor.length()), xor); + } + + private String bitsString(Bits bits) { + StringBuilder s = new StringBuilder(); + for (int i = 0; i < bits.length(); i++) s.append(bits.get(i) ? 1 : 0); + return s.toString(); + } + public void testCaching() throws Exception { clearIndex(); assertU(adoc("id", "4", "val_i", "1")); diff --git a/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java b/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java index 6e413f6bfd2..ae1b0a32d6c 100644 --- a/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java +++ b/solr/core/src/test/org/apache/solr/search/TestQueryLimits.java @@ -76,7 +76,17 @@ public void testQueryLimits() throws Exception { rsp = solrClient.query( COLLECTION, - params("q", "id:*", "sort", "id asc", "facet", "true", "facet.field", "val_i")); + params( + "q", + "id:*", + "sort", + "id asc", + "facet", + "true", + "facet.field", + "val_i", + "multiThreaded", + "false")); assertNotNull( "should have partial results for expr " + matchingExpr, rsp.getHeader().get("partialResults")); diff --git a/solr/server/solr/solr.xml b/solr/server/solr/solr.xml index bf364331214..4fd60dd6823 100644 --- a/solr/server/solr/solr.xml +++ b/solr/server/solr/solr.xml @@ -34,6 +34,7 @@ ${solr.allowPaths:} ${solr.allowUrls:} ${solr.hideStackTrace:false} + ${solr.searchThreads:4} diff --git a/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc b/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc index 052165c0f7a..22e8d8a7071 100644 --- a/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc +++ b/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc @@ -178,6 +178,15 @@ Specifies the number of threads that will be assigned to replay updates in paral This pool is shared for all cores of the node. The default value is equal to the number of processors. +`indexSearcherExecutorThreads`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: 4 +|=== ++ +Specifies the number of threads that will be assigned for search queries. + `coreRootDirectory`:: + [%autowidth,frame=none]