Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17298 - ThreadCpuTimer safe for multi-threaded search #2595

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion solr/core/src/java/org/apache/solr/core/CoreContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
import static org.apache.solr.common.params.CommonParams.ZK_PATH;
import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH;
import static org.apache.solr.search.CpuAllowedLimit.TIMING_CONTEXT;
import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;

import com.github.benmanes.caffeine.cache.Interner;
Expand Down Expand Up @@ -155,6 +156,7 @@
import org.apache.solr.util.OrderedExecutor;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.StartupLoggingUtils;
import org.apache.solr.util.ThreadCpuTimer;
import org.apache.solr.util.stats.MetricUtils;
import org.apache.zookeeper.KeeperException;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
Expand Down Expand Up @@ -448,7 +450,8 @@ public CoreContainer(NodeConfig config, CoresLocator locator, boolean asyncSolrC
ExecutorUtil.newMDCAwareFixedThreadPool(
indexSearcherExecutorThreads, // thread count
indexSearcherExecutorThreads * 1000, // queue size
new SolrNamedThreadFactory("searcherCollector"));
new SolrNamedThreadFactory("searcherCollector"),
() -> ThreadCpuTimer.reset(TIMING_CONTEXT));
} else {
this.collectorExecutor = null;
}
Expand Down
49 changes: 31 additions & 18 deletions solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.CpuAllowedLimit;
import org.apache.solr.search.QueryLimits;
import org.apache.solr.search.SyntaxError;
import org.apache.solr.security.PermissionNameProvider;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
Expand All @@ -62,6 +63,7 @@ public abstract class RequestHandlerBase
ApiSupport,
PermissionNameProvider {

public static final String REQUEST_CPU_TIMER_CONTEXT = "publishCpuTime";
protected NamedList<?> initArgs = null;
protected SolrParams defaults;
protected SolrParams appends;
Expand Down Expand Up @@ -217,12 +219,8 @@ public abstract void handleRequestBody(SolrQueryRequest req, SolrQueryResponse r

@Override
public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {
ThreadCpuTimer threadCpuTimer = null;
if (publishCpuTime) {
threadCpuTimer =
SolrRequestInfo.getRequestInfo() == null
? new ThreadCpuTimer()
: SolrRequestInfo.getRequestInfo().getThreadCpuTimer();
ThreadCpuTimer.beginContext(REQUEST_CPU_TIMER_CONTEXT);
}
HandlerMetrics metrics = getMetricsForThisRequest(req);
metrics.requests.inc();
Expand All @@ -246,21 +244,36 @@ public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {
processErrorMetricsOnException(normalized, metrics);
rsp.setException(normalized);
} finally {
long elapsed = timer.stop();
metrics.totalTime.inc(elapsed);

if (publishCpuTime && threadCpuTimer != null) {
Optional<Long> cpuTime = threadCpuTimer.getElapsedCpuMs();
if (cpuTime.isPresent()) {
// add CPU_TIME if not already added by SearchHandler
NamedList<Object> header = rsp.getResponseHeader();
if (header != null) {
if (header.get(ThreadCpuTimer.CPU_TIME) == null) {
header.add(ThreadCpuTimer.CPU_TIME, cpuTime.get());
try {
long elapsed = timer.stop();
metrics.totalTime.inc(elapsed);

if (publishCpuTime) {
Optional<Long> cpuTime = ThreadCpuTimer.readMSandReset(REQUEST_CPU_TIMER_CONTEXT);
if (QueryLimits.getCurrentLimits().isLimitsEnabled()) {
// prefer the value from the limit if available to avoid confusing users with trivial
// differences. Not fond of the spotless formatting here...
cpuTime =
Optional.ofNullable(
(Long)
QueryLimits.getCurrentLimits()
.currentLimitValueFor(CpuAllowedLimit.class)
.orElse(cpuTime.orElse(null)));
}
if (cpuTime.isPresent()) {
// add CPU_TIME if not already added by SearchHandler
NamedList<Object> header = rsp.getResponseHeader();
if (header != null) {
if (header.get(ThreadCpuTimer.CPU_TIME) == null) {
header.add(ThreadCpuTimer.CPU_TIME, cpuTime.get());
}
}
rsp.addToLog(ThreadCpuTimer.LOCAL_CPU_TIME, cpuTime.get());
}
rsp.addToLog(ThreadCpuTimer.LOCAL_CPU_TIME, cpuTime.get());
}
} finally {
// whatever happens be sure to clear things out at end of request.
ThreadCpuTimer.reset();
}
}
}
Expand Down
23 changes: 4 additions & 19 deletions solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.QueryLimits;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.util.ThreadCpuTimer;
import org.apache.solr.util.TimeZoneUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,7 +45,6 @@ public class SolrRequestInfo {
private static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal =
ThreadLocal.withInitial(ArrayDeque::new);
static final Object LIMITS_KEY = new Object();
static final Object CPU_TIMER_KEY = new Object();

private int refCount = 1; // prevent closing when still used

Expand Down Expand Up @@ -80,10 +78,12 @@ public static void setRequestInfo(SolrRequestInfo info) {
assert false : "SolrRequestInfo Stack is full";
log.error("SolrRequestInfo Stack is full");
} else if (!stack.isEmpty() && info.req != null) {
// New SRI instances inherit limits and thread CPU from prior SRI regardless of parameters.
// New SRI instances inherit limits from prior SRI regardless of parameters.
// This ensures these two properties cannot be changed or removed for a given thread once set.
// if req is null then limits will be an empty instance with no limits anyway.
info.req.getContext().put(CPU_TIMER_KEY, stack.peek().getThreadCpuTimer());

// protected by !stack.isEmpty()
// noinspection DataFlowIssue
info.req.getContext().put(LIMITS_KEY, stack.peek().getLimits());
}
// this creates both new QueryLimits and new ThreadCpuTime if not already set
Expand Down Expand Up @@ -244,27 +244,12 @@ private void initQueryLimits() {
*/
public QueryLimits getLimits() {
// make sure the ThreadCpuTime is always initialized
getThreadCpuTimer();
return req == null || rsp == null
? QueryLimits.NONE
: (QueryLimits)
req.getContext().computeIfAbsent(LIMITS_KEY, (k) -> new QueryLimits(req, rsp));
}

/**
* Get the thread CPU time monitor for the current request. This will either trigger the creation
* of a new instance if it hasn't been yet created, or will retrieve the already existing instance
* from the "bottom" of the request stack.
*
* @return the {@link ThreadCpuTimer} object for the current request.
*/
public ThreadCpuTimer getThreadCpuTimer() {
return req == null
? new ThreadCpuTimer()
: (ThreadCpuTimer)
req.getContext().computeIfAbsent(CPU_TIMER_KEY, k -> new ThreadCpuTimer());
}

public SolrDispatchFilter.Action getAction() {
return action;
}
Expand Down
66 changes: 55 additions & 11 deletions solr/core/src/java/org/apache/solr/search/CpuAllowedLimit.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,23 @@
*/
package org.apache.solr.search;

import static org.apache.solr.util.ThreadCpuTimer.readNSAndReset;

import com.google.common.annotations.VisibleForTesting;
import java.lang.invoke.MethodHandles;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.jcip.annotations.NotThreadSafe;
import org.apache.lucene.index.QueryTimeout;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.util.ThreadCpuTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Enforces a CPU-time based timeout on a given SolrQueryRequest, as specified by the {@code
Expand All @@ -37,26 +46,25 @@
* @see ThreadCpuTimer
*/
@NotThreadSafe
public class CpuAllowedLimit implements QueryTimeout {
private final ThreadCpuTimer threadCpuTimer;
public class CpuAllowedLimit implements QueryLimit {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final long requestedTimeoutNs;
private volatile long timedOutAt = 0L;
AtomicLong accumulatedTime = new AtomicLong(0);
public static final String TIMING_CONTEXT = CpuAllowedLimit.class.getName();

/**
* Create an object to represent a CPU time limit for the current request. NOTE: this
* implementation will attempt to obtain an existing thread CPU time monitor, created when {@link
* SolrRequestInfo#getThreadCpuTimer()} is initialized.
* QueryLimits#QueryLimits(SolrQueryRequest, SolrQueryResponse)} is called.
*
* @param req solr request with a {@code cpuAllowed} parameter
*/
public CpuAllowedLimit(SolrQueryRequest req) {
if (!ThreadCpuTimer.isSupported()) {
throw new IllegalArgumentException("Thread CPU time monitoring is not available.");
}
SolrRequestInfo solrRequestInfo = SolrRequestInfo.getRequestInfo();
// get existing timer if available to ensure sub-queries can't reset/exceed the intended time
// constraint.
threadCpuTimer =
solrRequestInfo != null ? solrRequestInfo.getThreadCpuTimer() : new ThreadCpuTimer();
long reqCpuLimit = req.getParams().getLong(CommonParams.CPU_ALLOWED, -1L);

if (reqCpuLimit <= 0L) {
Expand All @@ -65,11 +73,15 @@ public CpuAllowedLimit(SolrQueryRequest req) {
}
// calculate the time when the limit is reached, e.g. account for the time already spent
requestedTimeoutNs = TimeUnit.NANOSECONDS.convert(reqCpuLimit, TimeUnit.MILLISECONDS);

// here we rely on the current thread never creating a second CpuAllowedLimit within the same
// request, and also rely on it always creating a new CpuAllowedLimit object for each
// request that requires it.
ThreadCpuTimer.beginContext(TIMING_CONTEXT);
}

@VisibleForTesting
CpuAllowedLimit(long limitMs) {
this.threadCpuTimer = new ThreadCpuTimer();
requestedTimeoutNs = TimeUnit.NANOSECONDS.convert(limitMs, TimeUnit.MILLISECONDS);
}

Expand All @@ -81,6 +93,38 @@ static boolean hasCpuLimit(SolrQueryRequest req) {
/** Return true if usage has exceeded the limit. */
@Override
public boolean shouldExit() {
return threadCpuTimer.getElapsedCpuNs() > requestedTimeoutNs;
if (timedOutAt > 0L) {
return true;
}
// if unsupported, use zero, and thus never exit, expect jvm and/or cpu branch
// prediction to short circuit things if unsupported.
Long delta = readNSAndReset(TIMING_CONTEXT).orElse(0L);
try {
if (accumulatedTime.addAndGet(delta) > requestedTimeoutNs) {
timedOutAt = accumulatedTime.get();
return true;
}
return false;
} finally {
if (log.isTraceEnabled()) {
java.text.DecimalFormatSymbols symbols = new DecimalFormatSymbols(Locale.US);
DecimalFormat formatter = new DecimalFormat("#,###", symbols);
String threadName = Thread.currentThread().getName();
String deltaFmt = formatter.format(delta);
String accumulated = formatter.format(accumulatedTime.get());
String timeoutForComparison = formatter.format(requestedTimeoutNs);
log.trace(
"++++++++++++ SHOULD_EXIT - measuredDelta:{} accumulated:{} vs {} ++++ ON:{}",
deltaFmt,
accumulated,
timeoutForComparison,
threadName);
}
}
}

@Override
public Object currentValue() {
return timedOutAt > 0 ? timedOutAt : accumulatedTime.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ SearchResult searchCollectorManagers(
}

static boolean allowMT(DelegatingCollector postFilter, QueryCommand cmd, Query query) {
if (postFilter != null
|| cmd.getSegmentTerminateEarly()
|| cmd.getTimeAllowed() > 0
|| !cmd.getMultiThreaded()) {
if (postFilter != null || cmd.getSegmentTerminateEarly() || !cmd.getMultiThreaded()) {
return false;
} else {
MTCollectorQueryCheck allowMT = new MTCollectorQueryCheck();
Expand Down
31 changes: 31 additions & 0 deletions solr/core/src/java/org/apache/solr/search/QueryLimit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search;

import org.apache.lucene.index.QueryTimeout;

public interface QueryLimit extends QueryTimeout {
/**
* A value representing the portion of the specified limit that has been consumed. Reading this
* value should never affect the outcome (other than the time it takes to do it).
*
* @return an expression of the amount of the limit used so far, numeric if possible, if
* non-numeric it should have toString() suitable for logging or similar expression to the
* user.
*/
Object currentValue();
}
12 changes: 11 additions & 1 deletion solr/core/src/java/org/apache/solr/search/QueryLimits.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.lucene.index.QueryTimeout;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.request.SolrQueryRequest;
Expand All @@ -34,7 +35,7 @@
* return true the next time it is checked (it may be checked in either Lucene code or Solr code)
*/
public class QueryLimits implements QueryTimeout {
private final List<QueryTimeout> limits =
private final List<QueryLimit> limits =
new ArrayList<>(3); // timeAllowed, cpu, and memory anticipated

public static QueryLimits NONE = new QueryLimits();
Expand Down Expand Up @@ -149,6 +150,15 @@ public String limitStatusMessage() {
return sb.toString();
}

public Optional<Object> currentLimitValueFor(Class<? extends QueryLimit> limitClass) {
for (QueryLimit limit : limits) {
if (limit.getClass().isAssignableFrom(limitClass)) {
return Optional.of(limit.currentValue());
}
}
return Optional.empty();
}

/** Return true if there are any limits enabled for the current request. */
public boolean isLimitsEnabled() {
return !limits.isEmpty();
Expand Down
12 changes: 5 additions & 7 deletions solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -1921,9 +1921,7 @@ public ScoreMode scoreMode() {
final TopDocs topDocs;
final ScoreMode scoreModeUsed;
if (!MultiThreadedSearcher.allowMT(pf.postFilter, cmd, query)) {
if (log.isDebugEnabled()) {
log.debug("skipping collector manager");
}
log.trace("SINGLE THREADED search, skipping collector manager in getDocListNC");
final TopDocsCollector<?> topCollector = buildTopDocsCollector(len, cmd);
MaxScoreCollector maxScoreCollector = null;
Collector collector = topCollector;
Expand All @@ -1942,9 +1940,7 @@ public ScoreMode scoreMode() {
? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore())
: 0.0f;
} else {
if (log.isDebugEnabled()) {
log.debug("using CollectorManager");
}
log.trace("MULTI-THREADED search, using CollectorManager int getDocListNC");
final MultiThreadedSearcher.SearchResult searchResult =
new MultiThreadedSearcher(this)
.searchCollectorManagers(len, cmd, query, true, needScores, false);
Expand Down Expand Up @@ -2046,6 +2042,8 @@ public ScoreMode scoreMode() {
} else {
final TopDocs topDocs;
if (!MultiThreadedSearcher.allowMT(pf.postFilter, cmd, query)) {
log.trace("SINGLE THREADED search, skipping collector manager in getDocListAndSetNC");

@SuppressWarnings({"rawtypes"})
final TopDocsCollector<? extends ScoreDoc> topCollector = buildTopDocsCollector(len, cmd);
final DocSetCollector setCollector = new DocSetCollector(maxDoc);
Expand All @@ -2072,7 +2070,7 @@ public ScoreMode scoreMode() {
? (maxScoreCollector == null ? Float.NaN : maxScoreCollector.getMaxScore())
: 0.0f;
} else {
log.debug("using CollectorManager");
log.trace("MULTI-THREADED search, using CollectorManager in getDocListAndSetNC");

boolean needMaxScore = needScores;
MultiThreadedSearcher.SearchResult searchResult =
Expand Down
Loading