diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 174deadd4fb..ed0b2a48654 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -26,6 +26,7 @@ import static org.apache.solr.common.params.CommonParams.METRICS_PATH; import static org.apache.solr.common.params.CommonParams.ZK_PATH; import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH; +import static org.apache.solr.search.CpuAllowedLimit.TIMING_CONTEXT; import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP; import com.github.benmanes.caffeine.cache.Interner; @@ -155,6 +156,7 @@ import org.apache.solr.util.OrderedExecutor; import org.apache.solr.util.RefCounted; import org.apache.solr.util.StartupLoggingUtils; +import org.apache.solr.util.ThreadCpuTimer; import org.apache.solr.util.stats.MetricUtils; import org.apache.zookeeper.KeeperException; import org.glassfish.hk2.utilities.binding.AbstractBinder; @@ -448,7 +450,8 @@ public CoreContainer(NodeConfig config, CoresLocator locator, boolean asyncSolrC ExecutorUtil.newMDCAwareFixedThreadPool( indexSearcherExecutorThreads, // thread count indexSearcherExecutorThreads * 1000, // queue size - new SolrNamedThreadFactory("searcherCollector")); + new SolrNamedThreadFactory("searcherCollector"), + () -> ThreadCpuTimer.reset(TIMING_CONTEXT)); } else { this.collectorExecutor = null; } diff --git a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java index ff020ddebd3..d85b939fbb5 100644 --- a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java +++ b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java @@ -43,8 +43,9 @@ import org.apache.solr.metrics.SolrMetricsContext; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestHandler; -import org.apache.solr.request.SolrRequestInfo; import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.search.CpuAllowedLimit; +import org.apache.solr.search.QueryLimits; import org.apache.solr.search.SyntaxError; import org.apache.solr.security.PermissionNameProvider; import org.apache.solr.update.processor.DistributedUpdateProcessor; @@ -62,6 +63,7 @@ public abstract class RequestHandlerBase ApiSupport, PermissionNameProvider { + public static final String REQUEST_CPU_TIMER_CONTEXT = "publishCpuTime"; protected NamedList initArgs = null; protected SolrParams defaults; protected SolrParams appends; @@ -217,12 +219,8 @@ public abstract void handleRequestBody(SolrQueryRequest req, SolrQueryResponse r @Override public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) { - ThreadCpuTimer threadCpuTimer = null; if (publishCpuTime) { - threadCpuTimer = - SolrRequestInfo.getRequestInfo() == null - ? new ThreadCpuTimer() - : SolrRequestInfo.getRequestInfo().getThreadCpuTimer(); + ThreadCpuTimer.beginContext(REQUEST_CPU_TIMER_CONTEXT); } HandlerMetrics metrics = getMetricsForThisRequest(req); metrics.requests.inc(); @@ -246,21 +244,36 @@ public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) { processErrorMetricsOnException(normalized, metrics); rsp.setException(normalized); } finally { - long elapsed = timer.stop(); - metrics.totalTime.inc(elapsed); - - if (publishCpuTime && threadCpuTimer != null) { - Optional cpuTime = threadCpuTimer.getElapsedCpuMs(); - if (cpuTime.isPresent()) { - // add CPU_TIME if not already added by SearchHandler - NamedList header = rsp.getResponseHeader(); - if (header != null) { - if (header.get(ThreadCpuTimer.CPU_TIME) == null) { - header.add(ThreadCpuTimer.CPU_TIME, cpuTime.get()); + try { + long elapsed = timer.stop(); + metrics.totalTime.inc(elapsed); + + if (publishCpuTime) { + Optional cpuTime = ThreadCpuTimer.readMSandReset(REQUEST_CPU_TIMER_CONTEXT); + if (QueryLimits.getCurrentLimits().isLimitsEnabled()) { + // prefer the value from the limit if available to avoid confusing users with trivial + // differences. Not fond of the spotless formatting here... + cpuTime = + Optional.ofNullable( + (Long) + QueryLimits.getCurrentLimits() + .currentLimitValueFor(CpuAllowedLimit.class) + .orElse(cpuTime.orElse(null))); + } + if (cpuTime.isPresent()) { + // add CPU_TIME if not already added by SearchHandler + NamedList header = rsp.getResponseHeader(); + if (header != null) { + if (header.get(ThreadCpuTimer.CPU_TIME) == null) { + header.add(ThreadCpuTimer.CPU_TIME, cpuTime.get()); + } } + rsp.addToLog(ThreadCpuTimer.LOCAL_CPU_TIME, cpuTime.get()); } - rsp.addToLog(ThreadCpuTimer.LOCAL_CPU_TIME, cpuTime.get()); } + } finally { + // whatever happens be sure to clear things out at end of request. + ThreadCpuTimer.reset(); } } } diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java index b7d0a11eb97..193b838e9cd 100644 --- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java +++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java @@ -33,7 +33,6 @@ import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.search.QueryLimits; import org.apache.solr.servlet.SolrDispatchFilter; -import org.apache.solr.util.ThreadCpuTimer; import org.apache.solr.util.TimeZoneUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +45,6 @@ public class SolrRequestInfo { private static final ThreadLocal> threadLocal = ThreadLocal.withInitial(ArrayDeque::new); static final Object LIMITS_KEY = new Object(); - static final Object CPU_TIMER_KEY = new Object(); private int refCount = 1; // prevent closing when still used @@ -80,10 +78,12 @@ public static void setRequestInfo(SolrRequestInfo info) { assert false : "SolrRequestInfo Stack is full"; log.error("SolrRequestInfo Stack is full"); } else if (!stack.isEmpty() && info.req != null) { - // New SRI instances inherit limits and thread CPU from prior SRI regardless of parameters. + // New SRI instances inherit limits from prior SRI regardless of parameters. // This ensures these two properties cannot be changed or removed for a given thread once set. // if req is null then limits will be an empty instance with no limits anyway. - info.req.getContext().put(CPU_TIMER_KEY, stack.peek().getThreadCpuTimer()); + + // protected by !stack.isEmpty() + // noinspection DataFlowIssue info.req.getContext().put(LIMITS_KEY, stack.peek().getLimits()); } // this creates both new QueryLimits and new ThreadCpuTime if not already set @@ -244,27 +244,12 @@ private void initQueryLimits() { */ public QueryLimits getLimits() { // make sure the ThreadCpuTime is always initialized - getThreadCpuTimer(); return req == null || rsp == null ? QueryLimits.NONE : (QueryLimits) req.getContext().computeIfAbsent(LIMITS_KEY, (k) -> new QueryLimits(req, rsp)); } - /** - * Get the thread CPU time monitor for the current request. This will either trigger the creation - * of a new instance if it hasn't been yet created, or will retrieve the already existing instance - * from the "bottom" of the request stack. - * - * @return the {@link ThreadCpuTimer} object for the current request. - */ - public ThreadCpuTimer getThreadCpuTimer() { - return req == null - ? new ThreadCpuTimer() - : (ThreadCpuTimer) - req.getContext().computeIfAbsent(CPU_TIMER_KEY, k -> new ThreadCpuTimer()); - } - public SolrDispatchFilter.Action getAction() { return action; } diff --git a/solr/core/src/java/org/apache/solr/search/CpuAllowedLimit.java b/solr/core/src/java/org/apache/solr/search/CpuAllowedLimit.java index aa3a1fae7b2..72362159f0b 100644 --- a/solr/core/src/java/org/apache/solr/search/CpuAllowedLimit.java +++ b/solr/core/src/java/org/apache/solr/search/CpuAllowedLimit.java @@ -16,14 +16,23 @@ */ package org.apache.solr.search; +import static org.apache.solr.util.ThreadCpuTimer.readNSAndReset; + import com.google.common.annotations.VisibleForTesting; +import java.lang.invoke.MethodHandles; +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; +import java.util.Locale; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import net.jcip.annotations.NotThreadSafe; -import org.apache.lucene.index.QueryTimeout; import org.apache.solr.common.params.CommonParams; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestInfo; +import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.util.ThreadCpuTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Enforces a CPU-time based timeout on a given SolrQueryRequest, as specified by the {@code @@ -37,14 +46,18 @@ * @see ThreadCpuTimer */ @NotThreadSafe -public class CpuAllowedLimit implements QueryTimeout { - private final ThreadCpuTimer threadCpuTimer; +public class CpuAllowedLimit implements QueryLimit { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final long requestedTimeoutNs; + private volatile long timedOutAt = 0L; + AtomicLong accumulatedTime = new AtomicLong(0); + public static final String TIMING_CONTEXT = CpuAllowedLimit.class.getName(); /** * Create an object to represent a CPU time limit for the current request. NOTE: this * implementation will attempt to obtain an existing thread CPU time monitor, created when {@link - * SolrRequestInfo#getThreadCpuTimer()} is initialized. + * QueryLimits#QueryLimits(SolrQueryRequest, SolrQueryResponse)} is called. * * @param req solr request with a {@code cpuAllowed} parameter */ @@ -52,11 +65,6 @@ public CpuAllowedLimit(SolrQueryRequest req) { if (!ThreadCpuTimer.isSupported()) { throw new IllegalArgumentException("Thread CPU time monitoring is not available."); } - SolrRequestInfo solrRequestInfo = SolrRequestInfo.getRequestInfo(); - // get existing timer if available to ensure sub-queries can't reset/exceed the intended time - // constraint. - threadCpuTimer = - solrRequestInfo != null ? solrRequestInfo.getThreadCpuTimer() : new ThreadCpuTimer(); long reqCpuLimit = req.getParams().getLong(CommonParams.CPU_ALLOWED, -1L); if (reqCpuLimit <= 0L) { @@ -65,11 +73,15 @@ public CpuAllowedLimit(SolrQueryRequest req) { } // calculate the time when the limit is reached, e.g. account for the time already spent requestedTimeoutNs = TimeUnit.NANOSECONDS.convert(reqCpuLimit, TimeUnit.MILLISECONDS); + + // here we rely on the current thread never creating a second CpuAllowedLimit within the same + // request, and also rely on it always creating a new CpuAllowedLimit object for each + // request that requires it. + ThreadCpuTimer.beginContext(TIMING_CONTEXT); } @VisibleForTesting CpuAllowedLimit(long limitMs) { - this.threadCpuTimer = new ThreadCpuTimer(); requestedTimeoutNs = TimeUnit.NANOSECONDS.convert(limitMs, TimeUnit.MILLISECONDS); } @@ -81,6 +93,38 @@ static boolean hasCpuLimit(SolrQueryRequest req) { /** Return true if usage has exceeded the limit. */ @Override public boolean shouldExit() { - return threadCpuTimer.getElapsedCpuNs() > requestedTimeoutNs; + if (timedOutAt > 0L) { + return true; + } + // if unsupported, use zero, and thus never exit, expect jvm and/or cpu branch + // prediction to short circuit things if unsupported. + Long delta = readNSAndReset(TIMING_CONTEXT).orElse(0L); + try { + if (accumulatedTime.addAndGet(delta) > requestedTimeoutNs) { + timedOutAt = accumulatedTime.get(); + return true; + } + return false; + } finally { + if (log.isTraceEnabled()) { + java.text.DecimalFormatSymbols symbols = new DecimalFormatSymbols(Locale.US); + DecimalFormat formatter = new DecimalFormat("#,###", symbols); + String threadName = Thread.currentThread().getName(); + String deltaFmt = formatter.format(delta); + String accumulated = formatter.format(accumulatedTime.get()); + String timeoutForComparison = formatter.format(requestedTimeoutNs); + log.trace( + "++++++++++++ SHOULD_EXIT - measuredDelta:{} accumulated:{} vs {} ++++ ON:{}", + deltaFmt, + accumulated, + timeoutForComparison, + threadName); + } + } + } + + @Override + public Object currentValue() { + return timedOutAt > 0 ? timedOutAt : accumulatedTime.get(); } } diff --git a/solr/core/src/java/org/apache/solr/search/MultiThreadedSearcher.java b/solr/core/src/java/org/apache/solr/search/MultiThreadedSearcher.java index f9810b1a083..b56931a650e 100644 --- a/solr/core/src/java/org/apache/solr/search/MultiThreadedSearcher.java +++ b/solr/core/src/java/org/apache/solr/search/MultiThreadedSearcher.java @@ -120,10 +120,7 @@ SearchResult searchCollectorManagers( } static boolean allowMT(DelegatingCollector postFilter, QueryCommand cmd, Query query) { - if (postFilter != null - || cmd.getSegmentTerminateEarly() - || cmd.getTimeAllowed() > 0 - || !cmd.getMultiThreaded()) { + if (postFilter != null || cmd.getSegmentTerminateEarly() || !cmd.getMultiThreaded()) { return false; } else { MTCollectorQueryCheck allowMT = new MTCollectorQueryCheck(); diff --git a/solr/core/src/java/org/apache/solr/search/QueryLimit.java b/solr/core/src/java/org/apache/solr/search/QueryLimit.java new file mode 100644 index 00000000000..1043707f06b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/search/QueryLimit.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.search; + +import org.apache.lucene.index.QueryTimeout; + +public interface QueryLimit extends QueryTimeout { + /** + * A value representing the portion of the specified limit that has been consumed. Reading this + * value should never affect the outcome (other than the time it takes to do it). + * + * @return an expression of the amount of the limit used so far, numeric if possible, if + * non-numeric it should have toString() suitable for logging or similar expression to the + * user. + */ + Object currentValue(); +} diff --git a/solr/core/src/java/org/apache/solr/search/QueryLimits.java b/solr/core/src/java/org/apache/solr/search/QueryLimits.java index 2e232bccde6..86c7f488de3 100644 --- a/solr/core/src/java/org/apache/solr/search/QueryLimits.java +++ b/solr/core/src/java/org/apache/solr/search/QueryLimits.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.lucene.index.QueryTimeout; import org.apache.solr.common.params.CommonParams; import org.apache.solr.request.SolrQueryRequest; @@ -34,7 +35,7 @@ * return true the next time it is checked (it may be checked in either Lucene code or Solr code) */ public class QueryLimits implements QueryTimeout { - private final List limits = + private final List limits = new ArrayList<>(3); // timeAllowed, cpu, and memory anticipated public static QueryLimits NONE = new QueryLimits(); @@ -149,6 +150,15 @@ public String limitStatusMessage() { return sb.toString(); } + public Optional currentLimitValueFor(Class limitClass) { + for (QueryLimit limit : limits) { + if (limit.getClass().isAssignableFrom(limitClass)) { + return Optional.of(limit.currentValue()); + } + } + return Optional.empty(); + } + /** Return true if there are any limits enabled for the current request. */ public boolean isLimitsEnabled() { return !limits.isEmpty(); diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java index 8204487fed3..db3940f464e 100644 --- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java +++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java @@ -1921,9 +1921,7 @@ public ScoreMode scoreMode() { final TopDocs topDocs; final ScoreMode scoreModeUsed; if (!MultiThreadedSearcher.allowMT(pf.postFilter, cmd, query)) { - if (log.isDebugEnabled()) { - log.debug("skipping collector manager"); - } + log.trace("SINGLE THREADED search, skipping collector manager in getDocListNC"); final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd); MaxScoreCollector maxScoreCollector = null; Collector collector = topCollector; @@ -1942,9 +1940,7 @@ public ScoreMode scoreMode() { ? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore()) : 0.0f; } else { - if (log.isDebugEnabled()) { - log.debug("using CollectorManager"); - } + log.trace("MULTI-THREADED search, using CollectorManager int getDocListNC"); final MultiThreadedSearcher.SearchResult searchResult = new MultiThreadedSearcher(this) .searchCollectorManagers(len, cmd, query, true, needScores, false); @@ -2046,6 +2042,8 @@ public ScoreMode scoreMode() { } else { final TopDocs topDocs; if (!MultiThreadedSearcher.allowMT(pf.postFilter, cmd, query)) { + log.trace("SINGLE THREADED search, skipping collector manager in getDocListAndSetNC"); + @SuppressWarnings({"rawtypes"}) final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd); final DocSetCollector setCollector = new DocSetCollector(maxDoc); @@ -2072,7 +2070,7 @@ public ScoreMode scoreMode() { ? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore()) : 0.0f; } else { - log.debug("using CollectorManager"); + log.trace("MULTI-THREADED search, using CollectorManager in getDocListAndSetNC"); boolean needMaxScore = needScores; MultiThreadedSearcher.SearchResult searchResult = diff --git a/solr/core/src/java/org/apache/solr/search/TimeAllowedLimit.java b/solr/core/src/java/org/apache/solr/search/TimeAllowedLimit.java index 432993d6c43..f837712176c 100644 --- a/solr/core/src/java/org/apache/solr/search/TimeAllowedLimit.java +++ b/solr/core/src/java/org/apache/solr/search/TimeAllowedLimit.java @@ -19,7 +19,6 @@ import static java.lang.System.nanoTime; import java.util.concurrent.TimeUnit; -import org.apache.lucene.index.QueryTimeout; import org.apache.solr.common.params.CommonParams; import org.apache.solr.request.SolrQueryRequest; @@ -30,9 +29,10 @@ * has {@code timeAllowed} set. Essentially only one timeAllowed can be specified for any thread * executing a query. This is to ensure that subqueries don't escape from the intended limit */ -public class TimeAllowedLimit implements QueryTimeout { +public class TimeAllowedLimit implements QueryLimit { private final long timeoutAt; + private final long timingSince; /** * Create an object to represent a time limit for the current request. @@ -50,9 +50,12 @@ public TimeAllowedLimit(SolrQueryRequest req) { throw new IllegalArgumentException( "Check for limit with hasTimeLimit(req) before creating a TimeAllowedLimit"); } - long timeAllowed = reqTimeAllowed - (long) req.getRequestTimer().getTime(); + long timeAlreadySpent = (long) req.getRequestTimer().getTime(); + long now = nanoTime(); + long timeAllowed = reqTimeAllowed - timeAlreadySpent; long nanosAllowed = TimeUnit.NANOSECONDS.convert(timeAllowed, TimeUnit.MILLISECONDS); - timeoutAt = nanoTime() + nanosAllowed; + timeoutAt = now + nanosAllowed; + timingSince = now - timeAlreadySpent; } /** Return true if the current request has a parameter with a valid value of the limit. */ @@ -65,4 +68,9 @@ static boolean hasTimeLimit(SolrQueryRequest req) { public boolean shouldExit() { return timeoutAt - nanoTime() < 0L; } + + @Override + public Object currentValue() { + return nanoTime() - timingSince; + } } diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java index 4f1547ccd00..8168f7beafe 100644 --- a/solr/core/src/java/org/apache/solr/util/TestInjection.java +++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java @@ -16,11 +16,18 @@ */ package org.apache.solr.util; +import com.google.common.util.concurrent.AtomicDouble; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; import java.util.Collections; import java.util.HashSet; +import java.util.Locale; import java.util.Random; import java.util.Set; import java.util.Timer; @@ -32,13 +39,13 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.QueryTimeout; import org.apache.solr.common.NonExistentCoreException; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.util.Pair; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrCore; +import org.apache.solr.search.QueryLimit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,10 +161,26 @@ static Random random() { // non-private for testing public static volatile AtomicInteger countDocSetDelays = new AtomicInteger(0); - public static volatile QueryTimeout queryTimeout = null; + public static volatile QueryLimit queryTimeout = null; public static volatile boolean failInExecutePlanAction = false; + public static volatile AtomicInteger cpuTimerDelayInjectedNS = null; + + private static final KeyPairGenerator kpg; + + static { + KeyPairGenerator generator; + try { + generator = KeyPairGenerator.getInstance("RSA"); + } catch (NoSuchAlgorithmException e) { + generator = null; + } + kpg = generator; + } + + private static volatile AtomicDouble cpuLoadPerKey = null; + /** * Defaults to false, If set to true, then {@link * #injectSkipIndexWriterCommitOnClose} will return true @@ -531,6 +554,55 @@ public static boolean injectDirectUpdateLatch() { return true; } + public static void measureCpu() { + if (kpg == null || cpuLoadPerKey != null) { + return; + } + long start = System.nanoTime(); + for (int i = 0; i < 100; i++) { + genKeyPairAndDiscard(); + } + // note that this is potentially imprecise because our thread could get paused in the middle of + // this, but + // it should give us some notion + long end = System.nanoTime(); + cpuLoadPerKey = new AtomicDouble((end - start) / 100.0); + log.info("CPU per key = {}", cpuLoadPerKey); + } + + private static void genKeyPairAndDiscard() { + kpg.initialize(1024); + KeyPair kp = kpg.generateKeyPair(); + // avoid this getting optimized away by logging it + if (log.isTraceEnabled()) { + log.trace("{}", kp.getPublic()); + } + } + + private static void wasteCpu(int nanos) { + double wasteMe = nanos; + double loadPerKey = cpuLoadPerKey.get(); + if (loadPerKey > nanos) { + java.text.DecimalFormatSymbols symbols = new DecimalFormatSymbols(Locale.US); + + DecimalFormat formatter = new DecimalFormat("#,###.00", symbols); + // yes this is still wasting formatting when not warn, but not important here. + log.warn( + "Test requests smaller simulated cpu lag than a single keypair generation actual lag is {} ns", + formatter.format(loadPerKey)); + } + do { + genKeyPairAndDiscard(); + } while ((wasteMe = wasteMe - loadPerKey) > 0.0); + } + + public static void injectCpuUseInSearcherCpuLimitCheck() { + if (LUCENE_TEST_CASE == null) return; + if (cpuTimerDelayInjectedNS != null) { + wasteCpu(cpuTimerDelayInjectedNS.get()); + } + } + public static boolean injectReindexFailure() { if (reindexFailure != null) { Random rand = random(); diff --git a/solr/core/src/java/org/apache/solr/util/ThreadCpuTimer.java b/solr/core/src/java/org/apache/solr/util/ThreadCpuTimer.java index 054a5310d14..17d61fe3748 100644 --- a/solr/core/src/java/org/apache/solr/util/ThreadCpuTimer.java +++ b/solr/core/src/java/org/apache/solr/util/ThreadCpuTimer.java @@ -19,24 +19,19 @@ import java.lang.invoke.MethodHandles; import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import net.jcip.annotations.NotThreadSafe; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Allows tracking information about the current thread using the JVM's built-in management bean - * {@link java.lang.management.ThreadMXBean}. - * - *

Calling code should create an instance of this class when starting the operation, and then can - * get the {@link #getElapsedCpuMs()} at any time thereafter. - * - *

This class is irrevocably not thread safe. Never allow instances of this class to be exposed - * to more than one thread. Acquiring an external lock will not be sufficient. This class can be - * considered "lock-hostile" due to its caching of timing information for a specific thread. + * {@link java.lang.management.ThreadMXBean}. Methods on this class are safe for use on any thread, + * but will return different values for different threads by design. */ -@NotThreadSafe public class ThreadCpuTimer { private static final long UNSUPPORTED = -1; public static final String CPU_TIME = "cpuTime"; @@ -59,14 +54,14 @@ public class ThreadCpuTimer { } } - private final long startCpuTimeNanos; + private static final ThreadLocal> threadLocalTimer = + ThreadLocal.withInitial(ConcurrentHashMap::new); - /** - * Create an instance to track the current thread's usage of CPU. Usage information can later be - * retrieved by calling {@link #getElapsedCpuMs()}. Timing starts immediately upon construction. - */ - public ThreadCpuTimer() { - this.startCpuTimeNanos = getThreadTotalCpuNs(); + /* no instances shall be created. */ + private ThreadCpuTimer() {} + + public static void beginContext(String context) { + readNSAndReset(context); } public static boolean isSupported() { @@ -74,47 +69,64 @@ public static boolean isSupported() { } /** - * Return CPU time consumed by this thread since the construction of this timer object. + * Get the number of nanoseconds since the last time this thread took a reading + * for the supplied context. * - * @return current value, or {@link #UNSUPPORTED} if not supported. + * @param context An arbitrary name that code can supply to avoid clashing with other usages. + * @return An optional long which may be empty if + * java.lang.management.ManagementFactory#getThreadMXBean() is unsupported or otherwise + * unavailable. */ - public long getElapsedCpuNs() { - return this.startCpuTimeNanos != UNSUPPORTED - ? getThreadTotalCpuNs() - this.startCpuTimeNanos - : UNSUPPORTED; + public static Optional readNSAndReset(String context) { + // simulate heavy query and/or heavy CPU load in tests + TestInjection.injectCpuUseInSearcherCpuLimitCheck(); + if (THREAD_MX_BEAN == null) { + return Optional.empty(); + } else { + AtomicLong threadCpuTime = + threadLocalTimer + .get() + .computeIfAbsent( + context, (ctx) -> new AtomicLong(THREAD_MX_BEAN.getCurrentThreadCpuTime())); + long currentThreadCpuTime = THREAD_MX_BEAN.getCurrentThreadCpuTime(); + long result = currentThreadCpuTime - threadCpuTime.get(); + threadCpuTime.set(currentThreadCpuTime); + return Optional.of(result); + } } /** - * Get the cpu time for the current thread since {@link Thread#start()} without throwing an - * exception. + * Discard any accumulated time for a given context since the last invocation. * - * @see ThreadMXBean#getCurrentThreadCpuTime() for important details - * @return the number of nanoseconds of cpu consumed by this thread since {@code Thread.start()}. + * @param context the context to reset */ - private long getThreadTotalCpuNs() { + public static void reset(String context) { if (THREAD_MX_BEAN != null) { - return THREAD_MX_BEAN.getCurrentThreadCpuTime(); - } else { - return UNSUPPORTED; + threadLocalTimer + .get() + .computeIfAbsent( + context, (ctx) -> new AtomicLong(THREAD_MX_BEAN.getCurrentThreadCpuTime())) + .set(THREAD_MX_BEAN.getCurrentThreadCpuTime()); } } + public static Optional readMSandReset(String context) { + return readNSAndReset(context) + .map((cpuTimeNs) -> TimeUnit.MILLISECONDS.convert(cpuTimeNs, TimeUnit.NANOSECONDS)); + } + /** - * Get the CPU usage information for the current thread since it created this {@link - * ThreadCpuTimer}. The result is undefined if called by any other thread. - * - * @return the thread's cpu since the creation of this {@link ThreadCpuTimer} instance. If the - * VM's cpu tracking is disabled, returned value will be {@link #UNSUPPORTED}. + * Cleanup method. This should be called at the very end of a request thread when it's absolutely + * sure no code will attempt a new reading. */ - public Optional getElapsedCpuMs() { - long cpuTimeNs = getElapsedCpuNs(); - return cpuTimeNs != UNSUPPORTED - ? Optional.of(TimeUnit.MILLISECONDS.convert(cpuTimeNs, TimeUnit.NANOSECONDS)) - : Optional.empty(); + public static void reset() { + threadLocalTimer.get().clear(); } @Override public String toString() { - return getElapsedCpuMs().map(String::valueOf).orElse("UNSUPPORTED"); + return THREAD_MX_BEAN == null + ? "UNSUPPORTED" + : "Timing contexts:" + threadLocalTimer.get().toString(); } } diff --git a/solr/core/src/test/org/apache/solr/search/CallerSpecificQueryLimit.java b/solr/core/src/test/org/apache/solr/search/CallerSpecificQueryLimit.java index 0d53b8653db..96a6bd5ee93 100644 --- a/solr/core/src/test/org/apache/solr/search/CallerSpecificQueryLimit.java +++ b/solr/core/src/test/org/apache/solr/search/CallerSpecificQueryLimit.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import org.apache.lucene.index.QueryTimeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +31,7 @@ * and optionally a method name, e.g. MoreLikeThisComponent or * ClusteringComponent.finishStage. */ -public class CallerSpecificQueryLimit implements QueryTimeout { +public class CallerSpecificQueryLimit implements QueryLimit { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); final StackWalker stackWalker = @@ -102,4 +101,9 @@ public boolean shouldExit() { } return matchingExpr.isPresent(); } + + @Override + public Object currentValue() { + return "This class just for testing, not a real limit"; + } } diff --git a/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java b/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java index ea02e448da8..1fd6c87616d 100644 --- a/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java +++ b/solr/core/src/test/org/apache/solr/search/TestCpuAllowedLimit.java @@ -20,12 +20,16 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.CloudUtil; import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.index.NoMergePolicyFactory; +import org.apache.solr.util.TestInjection; import org.apache.solr.util.ThreadCpuTimer; +import org.junit.AfterClass; import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; @@ -61,7 +65,12 @@ private static Path createConfigSet() throws Exception { } @BeforeClass - public static void setup() throws Exception { + public static void setupClass() throws Exception { + // Using NoMergePolicy and 100 commits we should get 100 segments (across all shards). + // At this point of writing MAX_SEGMENTS_PER_SLICE in lucene is 5, so we should be + // ensured that any multithreaded testing will create 20 executable tasks for the + // executor that was provided to index-searcher. + systemSetPropertySolrTestsMergePolicyFactory(NoMergePolicyFactory.class.getName()); System.setProperty(ThreadCpuTimer.ENABLE_CPU_TIME, "true"); Path configset = createConfigSet(); configureCluster(1).addConfig("conf", configset).configure(); @@ -73,8 +82,14 @@ public static void setup() throws Exception { cluster.getOpenOverseer().getSolrCloudManager(), "active", COLLECTION, clusterShape(3, 6)); for (int j = 0; j < 100; j++) { solrClient.add(COLLECTION, sdoc("id", "id-" + j, "val_i", j % 5)); + solrClient.commit(COLLECTION); // need to commit every doc to create many segments. } - solrClient.commit(COLLECTION); + } + + @AfterClass + public static void tearDownClass() { + TestInjection.cpuTimerDelayInjectedNS = null; + systemClearPropertySolrTestsMergePolicyFactory(); } @Test @@ -131,7 +146,9 @@ public void testDistribLimit() throws Exception { Number qtime = (Number) rsp.getHeader().get("QTime"); assertTrue("QTime expected " + qtime + " >> " + sleepMs, qtime.longValue() > sleepMs); assertNull("should not have partial results", rsp.getHeader().get("partialResults")); - + TestInjection.measureCpu(); + // 25 ms per 5 segments ~175ms each shard + TestInjection.cpuTimerDelayInjectedNS = new AtomicInteger(25_000_000); // timeAllowed set, should return partial results log.info("--- timeAllowed, partial results ---"); rsp = @@ -146,6 +163,28 @@ public void testDistribLimit() throws Exception { String.valueOf(sleepMs), "stages", "prepare,process", + "multiThreaded", + "false", + "timeAllowed", + "500")); + // System.err.println("rsp=" + rsp.jsonStr()); + assertNotNull("should have partial results", rsp.getHeader().get("partialResults")); + + log.info("--- timeAllowed, partial results, multithreading ---"); + rsp = + solrClient.query( + COLLECTION, + params( + "q", + "id:*", + "sort", + "id asc", + ExpensiveSearchComponent.SLEEP_MS_PARAM, + String.valueOf(sleepMs), + "stages", + "prepare,process", + "multiThreaded", + "true", "timeAllowed", "500")); // System.err.println("rsp=" + rsp.jsonStr()); @@ -161,15 +200,12 @@ public void testDistribLimit() throws Exception { "id:*", "sort", "id desc", - ExpensiveSearchComponent.CPU_LOAD_COUNT_PARAM, - "1", "stages", "prepare,process", "cpuAllowed", - "1000")); + "10000")); // System.err.println("rsp=" + rsp.jsonStr()); assertNull("should have full results", rsp.getHeader().get("partialResults")); - // cpuAllowed set, should return partial results log.info("--- cpuAllowed 1, partial results ---"); rsp = @@ -180,19 +216,17 @@ public void testDistribLimit() throws Exception { "id:*", "sort", "id desc", - ExpensiveSearchComponent.CPU_LOAD_COUNT_PARAM, - "10", "stages", "prepare,process", "cpuAllowed", - "50", + "100", "multiThreaded", "false")); // System.err.println("rsp=" + rsp.jsonStr()); assertNotNull("should have partial results", rsp.getHeader().get("partialResults")); // cpuAllowed set, should return partial results - log.info("--- cpuAllowed 2, partial results ---"); + log.info("--- cpuAllowed 2, partial results, multi-threaded ---"); rsp = solrClient.query( COLLECTION, @@ -201,14 +235,12 @@ public void testDistribLimit() throws Exception { "id:*", "sort", "id desc", - ExpensiveSearchComponent.CPU_LOAD_COUNT_PARAM, - "10", "stages", "prepare,process", "cpuAllowed", - "50", + "100", "multiThreaded", - "false")); + "true")); // System.err.println("rsp=" + rsp.jsonStr()); assertNotNull("should have partial results", rsp.getHeader().get("partialResults")); } diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java index a0444ea5a53..c917438249a 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -202,14 +202,15 @@ public static ExecutorService newMDCAwareFixedThreadPool( } public static ExecutorService newMDCAwareFixedThreadPool( - int nThreads, int queueCapacity, ThreadFactory threadFactory) { + int nThreads, int queueCapacity, ThreadFactory threadFactory, Runnable beforeExecute) { return new MDCAwareThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(queueCapacity), - threadFactory); + threadFactory, + beforeExecute); } /** @@ -257,8 +258,10 @@ public static ExecutorService newMDCAwareCachedThreadPool( public static class MDCAwareThreadPoolExecutor extends ThreadPoolExecutor { private static final int MAX_THREAD_NAME_LEN = 512; + public static final Runnable NOOP = () -> {}; private final boolean enableSubmitterStackTrace; + private final Runnable beforeExecuteTask; public MDCAwareThreadPoolExecutor( int corePoolSize, @@ -270,6 +273,7 @@ public MDCAwareThreadPoolExecutor( RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.enableSubmitterStackTrace = true; + this.beforeExecuteTask = NOOP; } public MDCAwareThreadPoolExecutor( @@ -280,6 +284,7 @@ public MDCAwareThreadPoolExecutor( BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.enableSubmitterStackTrace = true; + this.beforeExecuteTask = NOOP; } public MDCAwareThreadPoolExecutor( @@ -289,7 +294,8 @@ public MDCAwareThreadPoolExecutor( TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, true); + this( + corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, true, NOOP); } public MDCAwareThreadPoolExecutor( @@ -299,9 +305,30 @@ public MDCAwareThreadPoolExecutor( TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, - boolean enableSubmitterStackTrace) { + Runnable beforeExecuteTask) { + this( + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + threadFactory, + true, + beforeExecuteTask); + } + + public MDCAwareThreadPoolExecutor( + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + boolean enableSubmitterStackTrace, + Runnable beforeExecuteTask) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.enableSubmitterStackTrace = enableSubmitterStackTrace; + this.beforeExecuteTask = beforeExecuteTask; } public MDCAwareThreadPoolExecutor( @@ -313,6 +340,37 @@ public MDCAwareThreadPoolExecutor( RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); this.enableSubmitterStackTrace = true; + this.beforeExecuteTask = NOOP; + } + + public MDCAwareThreadPoolExecutor( + int corePoolSize, + int maximumPoolSize, + int keepAliveTime, + TimeUnit timeUnit, + BlockingQueue blockingQueue, + SolrNamedThreadFactory httpShardExecutor, + boolean enableSubmitterStackTrace) { + super( + corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue, httpShardExecutor); + this.enableSubmitterStackTrace = enableSubmitterStackTrace; + this.beforeExecuteTask = NOOP; + } + + public MDCAwareThreadPoolExecutor( + int i, + int maxValue, + long l, + TimeUnit timeUnit, + BlockingQueue es, + SolrNamedThreadFactory testExecutor, + boolean b) { + this(i, maxValue, l, timeUnit, es, testExecutor, b, NOOP); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + this.beforeExecuteTask.run(); } @Override