Skip to content

Commit

Permalink
SOLR-17414: multiThreaded search: don't throw RejectedExecutionExcept…
Browse files Browse the repository at this point in the history
…ion (#2701)

When searching with multiThreaded=true, the internal tasks may now block instead of enqueuing with a risk of rejection. Solr will use less resources under stress but to get the most of your machine, you may want to increase the thread pool.
And:
* Revert ExecutorUtil change from SOLR-17298
* Revert ExecutorUtil change from SOLR-13350
* rename CoreContainer.collectorManager to indexSearcherExecutor
  • Loading branch information
dsmiley authored Oct 14, 2024
1 parent 8ea6525 commit 7405bb1
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 93 deletions.
6 changes: 5 additions & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ New Features
---------------------
* SOLR-14496: Solr CLI commands now can interact with a Solr secured using Basic Authentication. (Eric Pugh)

* SOLR-17467: Solr CLI bin/solr start defaults to starting Solr in Cloud mode, use --user-managed switch for User Managed (aka Standalone) mode. (Eric Pugh)
* SOLR-17467: Solr CLI bin/solr start defaults to starting Solr in Cloud mode, use --user-managed switch for User Managed (aka Standalone) mode. (Eric Pugh)

Improvements
---------------------
Expand Down Expand Up @@ -141,6 +141,10 @@ Improvements
override an HTTP client's base URL may use `Http2SolrClient.requestWithBaseUrl` instead. (Jason Gerlowski,
Sanjay Dutt, David Smiley)

* SOLR-17414: When searching with multiThreaded=true, the internal tasks may now block instead of
enqueuing with a risk of rejection. Solr will use less resources under stress but to get the most
of your machine, you may want to increase the thread pool. (David Smiley)

Optimizations
---------------------
* SOLR-14985: Solrj CloudSolrClient with Solr URLs had serious performance regressions (since the
Expand Down
25 changes: 7 additions & 18 deletions solr/core/src/java/org/apache/solr/core/CoreContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
import static org.apache.solr.common.params.CommonParams.ZK_PATH;
import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH;
import static org.apache.solr.search.CpuAllowedLimit.TIMING_CONTEXT;
import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;

import com.github.benmanes.caffeine.cache.Interner;
Expand Down Expand Up @@ -144,6 +143,7 @@
import org.apache.solr.search.CacheConfig;
import org.apache.solr.search.SolrCache;
import org.apache.solr.search.SolrFieldCacheBean;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.security.AllowListUrlChecker;
import org.apache.solr.security.AuditLoggerPlugin;
import org.apache.solr.security.AuthenticationPlugin;
Expand All @@ -158,7 +158,6 @@
import org.apache.solr.util.OrderedExecutor;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.StartupLoggingUtils;
import org.apache.solr.util.ThreadCpuTimer;
import org.apache.solr.util.stats.MetricUtils;
import org.apache.zookeeper.KeeperException;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
Expand All @@ -183,8 +182,8 @@ public class CoreContainer {

final SolrCores solrCores;

public Executor getCollectorExecutor() {
return collectorExecutor;
public Executor getIndexSearcherExecutor() {
return indexSearcherExecutor;
}

public static class CoreLoadFailure {
Expand Down Expand Up @@ -290,7 +289,7 @@ public JerseyAppHandlerCache getJerseyAppHandlerCache() {

public final NodeRoles nodeRoles = new NodeRoles(System.getProperty(NodeRoles.NODE_ROLES_PROP));

private final ExecutorService collectorExecutor;
private final ExecutorService indexSearcherExecutor;

private final ClusterSingletons clusterSingletons =
new ClusterSingletons(
Expand Down Expand Up @@ -448,17 +447,7 @@ public CoreContainer(NodeConfig config, CoresLocator locator, boolean asyncSolrC

this.allowListUrlChecker = AllowListUrlChecker.create(config);

final int indexSearcherExecutorThreads = cfg.getIndexSearcherExecutorThreads();
if (0 < indexSearcherExecutorThreads) {
this.collectorExecutor =
ExecutorUtil.newMDCAwareFixedThreadPool(
indexSearcherExecutorThreads, // thread count
indexSearcherExecutorThreads * 1000, // queue size
new SolrNamedThreadFactory("searcherCollector"),
() -> ThreadCpuTimer.reset(TIMING_CONTEXT));
} else {
this.collectorExecutor = null;
}
this.indexSearcherExecutor = SolrIndexSearcher.initCollectorExecutor(cfg);
}

@SuppressWarnings({"unchecked"})
Expand Down Expand Up @@ -685,7 +674,7 @@ protected CoreContainer(Object testConstructor) {
distributedCollectionCommandRunner = Optional.empty();
allowPaths = null;
allowListUrlChecker = null;
collectorExecutor = null;
indexSearcherExecutor = null;
}

public static CoreContainer createAndLoad(Path solrHome) {
Expand Down Expand Up @@ -1288,7 +1277,7 @@ public void shutdown() {
}

ExecutorUtil.shutdownAndAwaitTermination(coreContainerAsyncTaskExecutor);
ExecutorUtil.shutdownAndAwaitTermination(collectorExecutor);
ExecutorUtil.shutdownAndAwaitTermination(indexSearcherExecutor);
ExecutorService customThreadPool =
ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("closeThreadPool"));

Expand Down
1 change: 0 additions & 1 deletion solr/core/src/java/org/apache/solr/core/NodeConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,6 @@ public static class NodeConfigBuilder {

public static final int DEFAULT_INDEX_SEARCHER_EXECUTOR_THREADS =
Runtime.getRuntime().availableProcessors();
;

private static final String DEFAULT_CORESLOCATORCLASS =
"org.apache.solr.core.CorePropertiesLocator";
Expand Down
48 changes: 47 additions & 1 deletion solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.solr.search;

import static org.apache.solr.search.CpuAllowedLimit.TIMING_CONTEXT;

import com.codahale.metrics.Gauge;
import java.io.Closeable;
import java.io.IOException;
Expand All @@ -31,6 +33,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -92,9 +96,12 @@
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.common.util.ExecutorUtil.MDCAwareThreadPoolExecutor;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.NodeConfig;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoBean;
Expand All @@ -115,6 +122,7 @@
import org.apache.solr.update.IndexFingerprint;
import org.apache.solr.update.SolrIndexConfig;
import org.apache.solr.util.IOFunction;
import org.apache.solr.util.ThreadCpuTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -210,6 +218,44 @@ private static DirectoryReader wrapReader(SolrCore core, DirectoryReader reader)
return reader;
}

/**
* Create an {@link ExecutorService} to be used by the Lucene {@link IndexSearcher#getExecutor()}.
* Shared across the whole node because it's a machine CPU resource.
*/
public static ExecutorService initCollectorExecutor(NodeConfig cfg) {
final int indexSearcherExecutorThreads = cfg.getIndexSearcherExecutorThreads();
if (0 >= indexSearcherExecutorThreads) {
return null;
}

return new MDCAwareThreadPoolExecutor(
indexSearcherExecutorThreads,
indexSearcherExecutorThreads,
0L,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(true) { // fairness
// a hack to force ThreadPoolExecutor to block if threads are busy
// -- otherwise it will throw RejectedExecutionException; unacceptable
@Override
public boolean offer(Runnable runnable) { // is supposed to not block, but we do anyway
try {
put(runnable); // blocks
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("interrupted submitting to search multi-threaded pool", e);
}
return true;
}
},
new SolrNamedThreadFactory("searcherCollector")) {

@Override
protected void beforeExecute(Thread t, Runnable r) {
ThreadCpuTimer.reset(TIMING_CONTEXT);
}
};
}

/**
* Builds the necessary collector chain (via delegate wrapping) and executes the query against it.
* This method takes into consideration both the explicitly provided collector and postFilter as
Expand Down Expand Up @@ -336,7 +382,7 @@ public SolrIndexSearcher(
boolean reserveDirectory,
DirectoryFactory directoryFactory)
throws IOException {
super(wrapReader(core, r), core.getCoreContainer().getCollectorExecutor());
super(wrapReader(core, r), core.getCoreContainer().getIndexSearcherExecutor());

this.path = path;
this.directoryFactory = directoryFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,10 @@ Similar to using <<timeAllowed Parameter,the `timeAllowed` Parameter>>, when ear
|===

This parameter set to `true` or `false` controls if Solr may use more than one thread to satisfy the request.
A `true` value presently allows the IndexSearcher to search across Lucene's segments in parallel, and the xref:configuration-guide:configuring-solr-xml.adoc#indexSearcherExecutorThreads[indexSearcherExecutorThreads] value can be customised in the `solr.xml` file. This parameter is ignored in the presence of `&segmentsTerminateEarly=true` (future work may enable it). This is a new parameter and is considered experimental and subject to change or removal in subsequent releases. Please share your feedback and experiences with it on our mailing lists.
A `true` value presently allows the IndexSearcher to search across Lucene's segments in parallel, and the xref:configuration-guide:configuring-solr-xml.adoc#indexSearcherExecutorThreads[indexSearcherExecutorThreads] value can be customised in the `solr.xml` file.
This parameter is ignored in the presence of `&segmentsTerminateEarly=true` (future work may enable it).
This is a new parameter and is considered experimental and subject to change or removal in subsequent releases.
Please share your feedback and experiences with it on our mailing lists.

== omitHeader Parameter

Expand Down
73 changes: 2 additions & 71 deletions solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,6 @@ public static ExecutorService newMDCAwareFixedThreadPool(
nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
}

public static ExecutorService newMDCAwareFixedThreadPool(
int nThreads, int queueCapacity, ThreadFactory threadFactory, Runnable beforeExecute) {
return new MDCAwareThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueCapacity),
threadFactory,
beforeExecute);
}

/**
* See {@link java.util.concurrent.Executors#newSingleThreadExecutor(ThreadFactory)}. Note the
* thread is always active, even if no tasks are submitted to the executor.
Expand Down Expand Up @@ -275,10 +263,8 @@ public static ExecutorService newMDCAwareCachedThreadPool(
public static class MDCAwareThreadPoolExecutor extends ThreadPoolExecutor {

private static final int MAX_THREAD_NAME_LEN = 512;
public static final Runnable NOOP = () -> {};

private final boolean enableSubmitterStackTrace;
private final Runnable beforeExecuteTask;

public MDCAwareThreadPoolExecutor(
int corePoolSize,
Expand All @@ -290,7 +276,6 @@ public MDCAwareThreadPoolExecutor(
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.enableSubmitterStackTrace = true;
this.beforeExecuteTask = NOOP;
}

public MDCAwareThreadPoolExecutor(
Expand All @@ -301,7 +286,6 @@ public MDCAwareThreadPoolExecutor(
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.enableSubmitterStackTrace = true;
this.beforeExecuteTask = NOOP;
}

public MDCAwareThreadPoolExecutor(
Expand All @@ -311,27 +295,7 @@ public MDCAwareThreadPoolExecutor(
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, true, NOOP);
}

public MDCAwareThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
Runnable beforeExecuteTask) {
this(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
true,
beforeExecuteTask);
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, true);
}

public MDCAwareThreadPoolExecutor(
Expand All @@ -341,11 +305,9 @@ public MDCAwareThreadPoolExecutor(
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
boolean enableSubmitterStackTrace,
Runnable beforeExecuteTask) {
boolean enableSubmitterStackTrace) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.enableSubmitterStackTrace = enableSubmitterStackTrace;
this.beforeExecuteTask = beforeExecuteTask;
}

public MDCAwareThreadPoolExecutor(
Expand All @@ -357,37 +319,6 @@ public MDCAwareThreadPoolExecutor(
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.enableSubmitterStackTrace = true;
this.beforeExecuteTask = NOOP;
}

public MDCAwareThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
int keepAliveTime,
TimeUnit timeUnit,
BlockingQueue<Runnable> blockingQueue,
SolrNamedThreadFactory httpShardExecutor,
boolean enableSubmitterStackTrace) {
super(
corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue, httpShardExecutor);
this.enableSubmitterStackTrace = enableSubmitterStackTrace;
this.beforeExecuteTask = NOOP;
}

public MDCAwareThreadPoolExecutor(
int i,
int maxValue,
long l,
TimeUnit timeUnit,
BlockingQueue<Runnable> es,
SolrNamedThreadFactory testExecutor,
boolean b) {
this(i, maxValue, l, timeUnit, es, testExecutor, b, NOOP);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
this.beforeExecuteTask.run();
}

@Override
Expand Down

0 comments on commit 7405bb1

Please sign in to comment.