From 7405bb19fa424ec79c6daaafd986670dc54d7dfe Mon Sep 17 00:00:00 2001 From: David Smiley Date: Mon, 14 Oct 2024 10:46:49 -0400 Subject: [PATCH] SOLR-17414: multiThreaded search: don't throw RejectedExecutionException (#2701) When searching with multiThreaded=true, the internal tasks may now block instead of enqueuing with a risk of rejection. Solr will use less resources under stress but to get the most of your machine, you may want to increase the thread pool. And: * Revert ExecutorUtil change from SOLR-17298 * Revert ExecutorUtil change from SOLR-13350 * rename CoreContainer.collectorManager to indexSearcherExecutor --- solr/CHANGES.txt | 6 +- .../org/apache/solr/core/CoreContainer.java | 25 ++----- .../java/org/apache/solr/core/NodeConfig.java | 1 - .../apache/solr/search/SolrIndexSearcher.java | 48 +++++++++++- .../pages/common-query-parameters.adoc | 5 +- .../apache/solr/common/util/ExecutorUtil.java | 73 +------------------ 6 files changed, 65 insertions(+), 93 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 1032406f538..1b60d044ad0 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -10,7 +10,7 @@ New Features --------------------- * SOLR-14496: Solr CLI commands now can interact with a Solr secured using Basic Authentication. (Eric Pugh) -* SOLR-17467: Solr CLI bin/solr start defaults to starting Solr in Cloud mode, use --user-managed switch for User Managed (aka Standalone) mode. (Eric Pugh) +* SOLR-17467: Solr CLI bin/solr start defaults to starting Solr in Cloud mode, use --user-managed switch for User Managed (aka Standalone) mode. (Eric Pugh) Improvements --------------------- @@ -141,6 +141,10 @@ Improvements override an HTTP client's base URL may use `Http2SolrClient.requestWithBaseUrl` instead. (Jason Gerlowski, Sanjay Dutt, David Smiley) +* SOLR-17414: When searching with multiThreaded=true, the internal tasks may now block instead of + enqueuing with a risk of rejection. Solr will use less resources under stress but to get the most + of your machine, you may want to increase the thread pool. (David Smiley) + Optimizations --------------------- * SOLR-14985: Solrj CloudSolrClient with Solr URLs had serious performance regressions (since the diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index b78475d8ad5..a92c73ef496 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -26,7 +26,6 @@ import static org.apache.solr.common.params.CommonParams.METRICS_PATH; import static org.apache.solr.common.params.CommonParams.ZK_PATH; import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH; -import static org.apache.solr.search.CpuAllowedLimit.TIMING_CONTEXT; import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP; import com.github.benmanes.caffeine.cache.Interner; @@ -144,6 +143,7 @@ import org.apache.solr.search.CacheConfig; import org.apache.solr.search.SolrCache; import org.apache.solr.search.SolrFieldCacheBean; +import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.security.AllowListUrlChecker; import org.apache.solr.security.AuditLoggerPlugin; import org.apache.solr.security.AuthenticationPlugin; @@ -158,7 +158,6 @@ import org.apache.solr.util.OrderedExecutor; import org.apache.solr.util.RefCounted; import org.apache.solr.util.StartupLoggingUtils; -import org.apache.solr.util.ThreadCpuTimer; import org.apache.solr.util.stats.MetricUtils; import org.apache.zookeeper.KeeperException; import org.glassfish.hk2.utilities.binding.AbstractBinder; @@ -183,8 +182,8 @@ public class CoreContainer { final SolrCores solrCores; - public Executor getCollectorExecutor() { - return collectorExecutor; + public Executor getIndexSearcherExecutor() { + return indexSearcherExecutor; } public static class CoreLoadFailure { @@ -290,7 +289,7 @@ public JerseyAppHandlerCache getJerseyAppHandlerCache() { public final NodeRoles nodeRoles = new NodeRoles(System.getProperty(NodeRoles.NODE_ROLES_PROP)); - private final ExecutorService collectorExecutor; + private final ExecutorService indexSearcherExecutor; private final ClusterSingletons clusterSingletons = new ClusterSingletons( @@ -448,17 +447,7 @@ public CoreContainer(NodeConfig config, CoresLocator locator, boolean asyncSolrC this.allowListUrlChecker = AllowListUrlChecker.create(config); - final int indexSearcherExecutorThreads = cfg.getIndexSearcherExecutorThreads(); - if (0 < indexSearcherExecutorThreads) { - this.collectorExecutor = - ExecutorUtil.newMDCAwareFixedThreadPool( - indexSearcherExecutorThreads, // thread count - indexSearcherExecutorThreads * 1000, // queue size - new SolrNamedThreadFactory("searcherCollector"), - () -> ThreadCpuTimer.reset(TIMING_CONTEXT)); - } else { - this.collectorExecutor = null; - } + this.indexSearcherExecutor = SolrIndexSearcher.initCollectorExecutor(cfg); } @SuppressWarnings({"unchecked"}) @@ -685,7 +674,7 @@ protected CoreContainer(Object testConstructor) { distributedCollectionCommandRunner = Optional.empty(); allowPaths = null; allowListUrlChecker = null; - collectorExecutor = null; + indexSearcherExecutor = null; } public static CoreContainer createAndLoad(Path solrHome) { @@ -1288,7 +1277,7 @@ public void shutdown() { } ExecutorUtil.shutdownAndAwaitTermination(coreContainerAsyncTaskExecutor); - ExecutorUtil.shutdownAndAwaitTermination(collectorExecutor); + ExecutorUtil.shutdownAndAwaitTermination(indexSearcherExecutor); ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("closeThreadPool")); diff --git a/solr/core/src/java/org/apache/solr/core/NodeConfig.java b/solr/core/src/java/org/apache/solr/core/NodeConfig.java index dbc44336484..72ec1bd5ee0 100644 --- a/solr/core/src/java/org/apache/solr/core/NodeConfig.java +++ b/solr/core/src/java/org/apache/solr/core/NodeConfig.java @@ -629,7 +629,6 @@ public static class NodeConfigBuilder { public static final int DEFAULT_INDEX_SEARCHER_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors(); - ; private static final String DEFAULT_CORESLOCATORCLASS = "org.apache.solr.core.CorePropertiesLocator"; diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java index 59830187f41..3c8afde39ff 100644 --- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java +++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java @@ -16,6 +16,8 @@ */ package org.apache.solr.search; +import static org.apache.solr.search.CpuAllowedLimit.TIMING_CONTEXT; + import com.codahale.metrics.Gauge; import java.io.Closeable; import java.io.IOException; @@ -31,6 +33,8 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -92,9 +96,12 @@ import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.CollectionUtil; +import org.apache.solr.common.util.ExecutorUtil.MDCAwareThreadPoolExecutor; import org.apache.solr.common.util.ObjectReleaseTracker; +import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.core.DirectoryFactory; import org.apache.solr.core.DirectoryFactory.DirContext; +import org.apache.solr.core.NodeConfig; import org.apache.solr.core.SolrConfig; import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrInfoBean; @@ -115,6 +122,7 @@ import org.apache.solr.update.IndexFingerprint; import org.apache.solr.update.SolrIndexConfig; import org.apache.solr.util.IOFunction; +import org.apache.solr.util.ThreadCpuTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,6 +218,44 @@ private static DirectoryReader wrapReader(SolrCore core, DirectoryReader reader) return reader; } + /** + * Create an {@link ExecutorService} to be used by the Lucene {@link IndexSearcher#getExecutor()}. + * Shared across the whole node because it's a machine CPU resource. + */ + public static ExecutorService initCollectorExecutor(NodeConfig cfg) { + final int indexSearcherExecutorThreads = cfg.getIndexSearcherExecutorThreads(); + if (0 >= indexSearcherExecutorThreads) { + return null; + } + + return new MDCAwareThreadPoolExecutor( + indexSearcherExecutorThreads, + indexSearcherExecutorThreads, + 0L, + TimeUnit.MILLISECONDS, + new SynchronousQueue<>(true) { // fairness + // a hack to force ThreadPoolExecutor to block if threads are busy + // -- otherwise it will throw RejectedExecutionException; unacceptable + @Override + public boolean offer(Runnable runnable) { // is supposed to not block, but we do anyway + try { + put(runnable); // blocks + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("interrupted submitting to search multi-threaded pool", e); + } + return true; + } + }, + new SolrNamedThreadFactory("searcherCollector")) { + + @Override + protected void beforeExecute(Thread t, Runnable r) { + ThreadCpuTimer.reset(TIMING_CONTEXT); + } + }; + } + /** * Builds the necessary collector chain (via delegate wrapping) and executes the query against it. * This method takes into consideration both the explicitly provided collector and postFilter as @@ -336,7 +382,7 @@ public SolrIndexSearcher( boolean reserveDirectory, DirectoryFactory directoryFactory) throws IOException { - super(wrapReader(core, r), core.getCoreContainer().getCollectorExecutor()); + super(wrapReader(core, r), core.getCoreContainer().getIndexSearcherExecutor()); this.path = path; this.directoryFactory = directoryFactory; diff --git a/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc b/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc index cacbddee4ac..e0684f9abd4 100644 --- a/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc +++ b/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc @@ -428,7 +428,10 @@ Similar to using <>, when ear |=== This parameter set to `true` or `false` controls if Solr may use more than one thread to satisfy the request. -A `true` value presently allows the IndexSearcher to search across Lucene's segments in parallel, and the xref:configuration-guide:configuring-solr-xml.adoc#indexSearcherExecutorThreads[indexSearcherExecutorThreads] value can be customised in the `solr.xml` file. This parameter is ignored in the presence of `&segmentsTerminateEarly=true` (future work may enable it). This is a new parameter and is considered experimental and subject to change or removal in subsequent releases. Please share your feedback and experiences with it on our mailing lists. +A `true` value presently allows the IndexSearcher to search across Lucene's segments in parallel, and the xref:configuration-guide:configuring-solr-xml.adoc#indexSearcherExecutorThreads[indexSearcherExecutorThreads] value can be customised in the `solr.xml` file. +This parameter is ignored in the presence of `&segmentsTerminateEarly=true` (future work may enable it). +This is a new parameter and is considered experimental and subject to change or removal in subsequent releases. +Please share your feedback and experiences with it on our mailing lists. == omitHeader Parameter diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java index 29873bf09b9..129ec8df8ca 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -201,18 +201,6 @@ public static ExecutorService newMDCAwareFixedThreadPool( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory); } - public static ExecutorService newMDCAwareFixedThreadPool( - int nThreads, int queueCapacity, ThreadFactory threadFactory, Runnable beforeExecute) { - return new MDCAwareThreadPoolExecutor( - nThreads, - nThreads, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(queueCapacity), - threadFactory, - beforeExecute); - } - /** * See {@link java.util.concurrent.Executors#newSingleThreadExecutor(ThreadFactory)}. Note the * thread is always active, even if no tasks are submitted to the executor. @@ -275,10 +263,8 @@ public static ExecutorService newMDCAwareCachedThreadPool( public static class MDCAwareThreadPoolExecutor extends ThreadPoolExecutor { private static final int MAX_THREAD_NAME_LEN = 512; - public static final Runnable NOOP = () -> {}; private final boolean enableSubmitterStackTrace; - private final Runnable beforeExecuteTask; public MDCAwareThreadPoolExecutor( int corePoolSize, @@ -290,7 +276,6 @@ public MDCAwareThreadPoolExecutor( RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.enableSubmitterStackTrace = true; - this.beforeExecuteTask = NOOP; } public MDCAwareThreadPoolExecutor( @@ -301,7 +286,6 @@ public MDCAwareThreadPoolExecutor( BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.enableSubmitterStackTrace = true; - this.beforeExecuteTask = NOOP; } public MDCAwareThreadPoolExecutor( @@ -311,27 +295,7 @@ public MDCAwareThreadPoolExecutor( TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { - this( - corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, true, NOOP); - } - - public MDCAwareThreadPoolExecutor( - int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue, - ThreadFactory threadFactory, - Runnable beforeExecuteTask) { - this( - corePoolSize, - maximumPoolSize, - keepAliveTime, - unit, - workQueue, - threadFactory, - true, - beforeExecuteTask); + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, true); } public MDCAwareThreadPoolExecutor( @@ -341,11 +305,9 @@ public MDCAwareThreadPoolExecutor( TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, - boolean enableSubmitterStackTrace, - Runnable beforeExecuteTask) { + boolean enableSubmitterStackTrace) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.enableSubmitterStackTrace = enableSubmitterStackTrace; - this.beforeExecuteTask = beforeExecuteTask; } public MDCAwareThreadPoolExecutor( @@ -357,37 +319,6 @@ public MDCAwareThreadPoolExecutor( RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); this.enableSubmitterStackTrace = true; - this.beforeExecuteTask = NOOP; - } - - public MDCAwareThreadPoolExecutor( - int corePoolSize, - int maximumPoolSize, - int keepAliveTime, - TimeUnit timeUnit, - BlockingQueue blockingQueue, - SolrNamedThreadFactory httpShardExecutor, - boolean enableSubmitterStackTrace) { - super( - corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue, httpShardExecutor); - this.enableSubmitterStackTrace = enableSubmitterStackTrace; - this.beforeExecuteTask = NOOP; - } - - public MDCAwareThreadPoolExecutor( - int i, - int maxValue, - long l, - TimeUnit timeUnit, - BlockingQueue es, - SolrNamedThreadFactory testExecutor, - boolean b) { - this(i, maxValue, l, timeUnit, es, testExecutor, b, NOOP); - } - - @Override - protected void beforeExecute(Thread t, Runnable r) { - this.beforeExecuteTask.run(); } @Override