From ad5f7832478eb773503cb21fe935a3abf12c1e23 Mon Sep 17 00:00:00 2001 From: markharwood Date: Mon, 16 Dec 2013 13:56:13 +0000 Subject: [PATCH 1/6] =?UTF-8?q?A=20new=20generic=20timeout=20handler=20and?= =?UTF-8?q?=20associated=20modifications=20for=20more=20effectively=20time?= =?UTF-8?q?-limiting=20search=20requests.=20Special=20runtime=20exceptions?= =?UTF-8?q?=20can=20now=20short-cut=20the=20execution=20of=20calls=20to=20?= =?UTF-8?q?Lucene=20and=20are=20caught=20and=20reported=20back,=20not=20as?= =?UTF-8?q?=20a=20fatal=20error=20but=20the=20existing=20=E2=80=9CtimedOut?= =?UTF-8?q?=E2=80=9D=20flag=20in=20results.=20Phases=20like=20the=20FetchP?= =?UTF-8?q?hase=20can=20now=20exit=20early=20and=20so=20also=20have=20a=20?= =?UTF-8?q?timed-out=20status.=20The=20SearchPhaseController=20does=20its?= =?UTF-8?q?=20best=20to=20assemble=20whatever=20hits,=20aggregations=20and?= =?UTF-8?q?=20facets=20have=20been=20produced=20within=20the=20provided=20?= =?UTF-8?q?time=20limits=20rather=20than=20returning=20nothing=20and=20thr?= =?UTF-8?q?owing=20an=20error.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ActivityTimeMonitor is the new central class for efficiently monitoring all forms of thread overrun in a JVM. The SearchContext setup is modified to register the start and end of query tasks with ActivityTimeMonitor. Store.java is modified to add timeout checks (via calls to ATM) in the low-level file access routines by using a delegating wrapper for Lucene's IndexInput. ContextIndexSearcher is modified to catch and unwrap ActivityTimedOutExceptions that can now come out of the Lucene or script calls and report them as timeouts along with any partial results. FetchPhase is similarly modified to deal with the possibility of timeout errors. --- .../org/elasticsearch/ExceptionsHelper.java | 60 ++++ .../util/concurrent/ActivityTimeMonitor.java | 262 ++++++++++++++++++ .../concurrent/ActivityTimedOutException.java | 40 +++ .../org/elasticsearch/index/store/Store.java | 100 +++++++ .../groovy/GroovyScriptEngineService.java | 5 +- .../elasticsearch/search/SearchService.java | 15 +- .../controller/SearchPhaseController.java | 41 +-- .../search/fetch/FetchPhase.java | 14 +- .../search/fetch/FetchSearchResult.java | 22 +- .../search/internal/ContextIndexSearcher.java | 11 + .../search/internal/SearchContext.java | 34 +++ .../search/query/QueryPhase.java | 29 +- .../concurrent/ActivityTimeMonitorTest.java | 194 +++++++++++++ .../search/query/SimpleQueryTests.java | 2 +- .../search/timeout/SearchTimeoutTests.java | 109 +++++++- 15 files changed, 907 insertions(+), 31 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java create mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimedOutException.java create mode 100644 src/test/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitorTest.java diff --git a/src/main/java/org/elasticsearch/ExceptionsHelper.java b/src/main/java/org/elasticsearch/ExceptionsHelper.java index 7a2c4e91f22b8..36d1c9a67e0d3 100644 --- a/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -74,6 +74,39 @@ public static Throwable unwrapCause(Throwable t) { } return result; } + + + + /** + * Determines if a given exception has a particular type of exception + * as a cause. + * @param e The exception received + * @param exType The type of exception that is being looked for as a cause + * @return true if the exception or any of the nested exceptions revealed by + * getCause() are of the given exType. + */ + public static boolean wasCausedBy(Throwable e, Class exType) { + if (exType == null) { + return false; + } + if (exType.isInstance(e)) { + return true; + } + Throwable cause = e.getCause(); + if (cause == e) { + return false; + } + while (cause != null) { + if (exType.isInstance(cause)) { + return true; + } + if (cause.getCause() == cause) { + break; + } + cause = cause.getCause(); + } + return false; + } public static String detailedMessage(Throwable t) { return detailedMessage(t, false, 0); @@ -184,4 +217,31 @@ public static boolean isOOM(Throwable t) { ) ); } + + /** + * Extracts ElasticsearchException causes from non-serializable exceptions e.g. + * those produced by script. + * @param e The exception received + * @return null or the top-most exception revealed by + * getCause() which is of the type ElasticsearchException + */ + public static ElasticsearchException extractElasticsearchCause(Throwable e) { + if (e instanceof ElasticsearchException) { + return (ElasticsearchException) e; + } + Throwable cause = e.getCause(); + if (cause == e) { + return null; + } + while (cause != null) { + if (cause instanceof ElasticsearchException) { + return (ElasticsearchException) cause; + } + if (cause.getCause() == cause) { + break; + } + cause = cause.getCause(); + } + return null; + } } diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java b/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java new file mode 100644 index 0000000000000..0c98452cb6f9f --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java @@ -0,0 +1,262 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * Utility class which efficiently monitors time-limited activities. When + * start() is called on an ActivityTimeMonitor with a maximum length of time, + * the status will automatically be updated to OVERRUNNING after the allotted + * time has expired. If a call is made to checkForTimeout() when the status is + * OVERRUNING this object will throw a {@link ActivityTimedOutException} Calling + * stop() will reset the monitor status to inactive. It is only permitted + * to call start() when inactive so in order to establish a new deadline for an active + * activity you will need to call stop() first then cal start() with the new deadline. + * When start() is called the ActivityTimeMonitor is bound to the current thread via + * a ThreadLocal and calling getCurrentThreadMonitor() will return the instance + * associated with the current thread. + * + * A background thread is used to handle all instances and set the OVERRUNING status + * at the appropriate points + */ +public class ActivityTimeMonitor implements Delayed { + + public enum ActivityStatus { + INACTIVE, ACTIVE, OVERRUNNING + } + private final AtomicReferenceFieldUpdater STATUS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ActivityTimeMonitor.class, ActivityStatus.class, "status"); + private volatile ActivityStatus status = ActivityStatus.INACTIVE; + private long scheduledTimeoutInMilliseconds = Long.MIN_VALUE; + private final Thread associatedThread; + + private ActivityTimeMonitor() { + associatedThread = Thread.currentThread(); + } + + public boolean moveToInactive() { + do { + ActivityStatus s = status; + switch (s) { + case INACTIVE: + return false; + case OVERRUNNING: + case ACTIVE: + if (STATUS_UPDATER.compareAndSet(this, s, ActivityStatus.INACTIVE)) { + return true; + } + default: + assert false; + } + } while (true); + } + + public void moveToActive() { + do { + ActivityStatus s = status; + switch (s) { + case ACTIVE: + throw new ElasticsearchIllegalArgumentException("Can't move to active - already active"); + case OVERRUNNING: + assert false; + case INACTIVE: + if (STATUS_UPDATER.compareAndSet(this, s, ActivityStatus.ACTIVE)) { + return; + } + default: + assert false; + } + } while (true); + } + + public void moveToOverrun() { + do { + ActivityStatus s = status; + switch (s) { + case OVERRUNNING: + assert false; + case INACTIVE: + return; + case ACTIVE: + if (STATUS_UPDATER.compareAndSet(this, s, ActivityStatus.OVERRUNNING)) { + return; + } + default: + assert false; + } + } while (true); + } + + public ActivityStatus getStatus() { + return status; + } + + public void start(long maxTime) { + assert status == ActivityStatus.INACTIVE; + this.scheduledTimeoutInMilliseconds = System.currentTimeMillis() + maxTime; + moveToActive(); + timeoutMonitorThread.add(this); + timeoutStateTL.set(this); + } + + public void stop() { + if (moveToInactive()) { + // need to remove the old timeout status from the monitor thread - + // this will also set status to INACTIVE + timeoutMonitorThread.remove(this); + } + } + + public void checkForTimeout() { + assert assertThread(); + if (status == ActivityStatus.OVERRUNNING) { + throw new ActivityTimedOutException("Timed out thread " + Thread.currentThread().getName(), getDelay(TimeUnit.MILLISECONDS)); + } + + } + + private boolean assertThread() { + assert associatedThread == Thread.currentThread() : "ActivityTimeMonitor is not threadsafe - can't be shared across threads"; + return true; + } + + // an internal thread that monitors timeout activity + private static final TimeoutMonitorThread timeoutMonitorThread; + // Thread-Local holding the current timeout state associated with threads + private static final ThreadLocal timeoutStateTL = new ThreadLocal(); + + static { + timeoutMonitorThread = new TimeoutMonitorThread(); + timeoutMonitorThread.setDaemon(true); + timeoutMonitorThread.start(); + } + + /** + * + * @return the number of timed activities currently being monitored + */ + public static int getCurrentNumActivities() { + return timeoutMonitorThread.inboundMessageQueue.size(); + } + + /** + * Get the timeout status information for the current thread + * + * @return null or the current ActivityTimeMonitor for the current thread + */ + public static ActivityTimeMonitor getCurrentThreadMonitor() { + return timeoutStateTL.get(); + } + + /** + * Get the timeout status information for the current thread or create if it does not exist + * + * @return the current thread's ActivityTimeMonitor + */ + public static ActivityTimeMonitor getOrCreateCurrentThreadMonitor() { + + ActivityTimeMonitor result = timeoutStateTL.get(); + if(result==null){ + result=new ActivityTimeMonitor(); + timeoutStateTL.set(result); + } + assert result.assertThread(); + return result; + } + + @Override + public int compareTo(Delayed o) { + ActivityTimeMonitor other = (ActivityTimeMonitor) o; + return (int) (scheduledTimeoutInMilliseconds - other.scheduledTimeoutInMilliseconds); + } + + @Override + public long getDelay(TimeUnit unit) { + long now = System.currentTimeMillis(); + return unit.convert(scheduledTimeoutInMilliseconds - now, TimeUnit.MILLISECONDS); + } + + /** + * Helper method to start a new activity + * @param maxTimeInMilliseconds the maximum length of time allowed for an activity + * @return the existing monitor for the current thread or, a new one if none existed before + */ + public static ActivityTimeMonitor startActivity(long maxTimeInMilliseconds) { + ActivityTimeMonitor result = getCurrentThreadMonitor(); + if (result == null) { + result = new ActivityTimeMonitor(); + } + result.start(maxTimeInMilliseconds); + return result; + } + + /** + * Helper method called to mark the stopping of a timed activity associated with the current thread + */ + public static ActivityTimeMonitor stopActivity() { + ActivityTimeMonitor result = getCurrentThreadMonitor(); + if (result != null) { + result.stop(); + } + return result; + } + + + // A background thread used to set the timed-out status for threads when + // they occur + private static final class TimeoutMonitorThread extends Thread { + // A queue of Thread statuses sorted by their scheduled timeout time. + DelayQueue inboundMessageQueue = new DelayQueue(); + + @Override + public void run() { + while (true) { + try { + // Block until the first ActivityTimeMonitor has reached its + // timeout then set the volatile boolean indicating an error + inboundMessageQueue.take().moveToOverrun(); + } catch (InterruptedException e1) { + // Need to keep on trucking + } + } + } + + void remove(ActivityTimeMonitor state) { + assert state.getStatus() == ActivityStatus.INACTIVE; + inboundMessageQueue.remove(state); + } + + void add(ActivityTimeMonitor state) { + assert state.getStatus() == ActivityStatus.ACTIVE; + if (state.scheduledTimeoutInMilliseconds <= System.currentTimeMillis()) { + state.moveToOverrun(); + } else { + inboundMessageQueue.add(state); + } + } + } + + +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimedOutException.java b/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimedOutException.java new file mode 100644 index 0000000000000..39ac6f0c834ae --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimedOutException.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.ElasticsearchException; + +public class ActivityTimedOutException extends ElasticsearchException +{ + + private long overrunMilliseconds; + + public ActivityTimedOutException(String msg, long overrun) + { + super(msg); + this.overrunMilliseconds=overrun; + } + + public long getOverrunMilliseconds() + { + return overrunMilliseconds; + } + +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 5095c477555fd..10826b05c9aa7 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -26,6 +26,7 @@ import org.apache.lucene.codecs.lucene46.Lucene46SegmentInfoFormat; import org.apache.lucene.index.*; import org.apache.lucene.store.*; +import org.apache.lucene.store.IOContext.Context; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.IOUtils; @@ -42,6 +43,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.index.CloseableIndexComponent; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.settings.IndexSettings; @@ -421,9 +423,107 @@ public IndexInput openInput(String name, IOContext context) throws IOException { IOUtils.closeWhileHandlingException(in); } } + // Merges and other non-search operations are not + // subject to timeout constraints. Only add timeout + // capable wrappers for search operations. + if (context.context == Context.READ) { + in = new TimeLimitedIndexInput(name, in, null); + } return in; } + class TimeLimitedIndexInput extends IndexInput { + + private final IndexInput in; + private final String name; + private final ActivityTimeMonitor activityState; + + public TimeLimitedIndexInput(String resourceDescription, IndexInput in, ActivityTimeMonitor monitor) { + super(resourceDescription); + this.in = in; + this.activityState = monitor; + this.name = resourceDescription; + } + + @Override + public void close() throws IOException { + try { + if (activityState != null) { + activityState.checkForTimeout(); + } + } finally { + in.close(); + } + } + + @Override + public long getFilePointer() { + if (activityState != null) { + activityState.checkForTimeout(); + } + return in.getFilePointer(); + } + + @Override + public void seek(long pos) throws IOException { + if (activityState != null) { + activityState.checkForTimeout(); + } + in.seek(pos); + } + + @Override + public long length() { + if (activityState != null) { + activityState.checkForTimeout(); + } + return in.length(); + } + + @Override + public byte readByte() throws IOException { + if (activityState != null) { + activityState.checkForTimeout(); + } + return in.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + if (activityState != null) { + activityState.checkForTimeout(); + } + in.readBytes(b, offset, len); + } + + @Override + public IndexInput clone() { + // because of reuse of IndexInput clones in search thread pools + // we may alternate between timed and untimed search operations + // and we cannot pre-empt here how this IndexInput might be used. + // So always need to assume the worst and wrap IndexInputs with + // timeout monitoring capabilities. + return new TimeLimitedIndexInput(name, in.clone(), ActivityTimeMonitor.getOrCreateCurrentThreadMonitor()); + } + + @Override + public boolean equals(Object o) { + return in.equals(o); + } + + @Override + public int hashCode() { + return in.hashCode(); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + return new TimeLimitedIndexInput(sliceDescription, in.slice(sliceDescription, offset, length), this.activityState); + } + } + + + @Override public void close() throws IOException { assert false : "Nobody should close this directory except of the Store itself"; diff --git a/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java b/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java index 299d90e71a9c6..591f812aa0981 100644 --- a/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java +++ b/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java @@ -241,7 +241,10 @@ public Object run() { if (logger.isTraceEnabled()) { logger.trace("exception running Groovy script", e); } - throw new GroovyScriptExecutionException(ExceptionsHelper.detailedMessage(e)); + // Cause needs to be discovered by peeling away layers of non-serializable classes introduced by + // dynamic Groovy classes - was getting an "unrecognised class" error on the reducer node when deserializing + // unknown "cause" exceptions. + throw new GroovyScriptExecutionException(ExceptionsHelper.detailedMessage(e), ExceptionsHelper.extractElasticsearchCause(e)); } } diff --git a/src/main/java/org/elasticsearch/search/SearchService.java b/src/main/java/org/elasticsearch/search/SearchService.java index 03cbb7995d471..1616d7e0b6660 100644 --- a/src/main/java/org/elasticsearch/search/SearchService.java +++ b/src/main/java/org/elasticsearch/search/SearchService.java @@ -134,6 +134,8 @@ public class SearchService extends AbstractLifecycleComponent { private final ImmutableMap elementParsers; + private final long defaultSearchTimeout; + @Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, IndicesWarmer indicesWarmer, ThreadPool threadPool, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, @@ -150,6 +152,11 @@ public SearchService(Settings settings, ClusterService clusterService, IndicesSe this.queryPhase = queryPhase; this.fetchPhase = fetchPhase; this.indicesQueryCache = indicesQueryCache; + /* + * TODO: This default timeout here should be used if no timeout on the request is set - we can + * randomly set this to high values to exercise it? + */ + this.defaultSearchTimeout = componentSettings.getAsTime("default_timeout", timeValueMinutes(1)).millis(); TimeValue keepAliveInterval = componentSettings.getAsTime(KEEPALIVE_INTERVAL_COMPONENENT_KEY, timeValueMinutes(1)); // we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes @@ -582,8 +589,14 @@ public void freeAllScrollContexts() { } private void contextProcessing(SearchContext context) { - // disable timeout while executing a search + // disable context tidy-ups while executing a search context.accessed(-1); + // start any timer required for this thread + if(context.timeoutInMillis()>0){ + // TODO could adjust remaining time spent executing this phase + // based on time already spent on prior phases + context.startTimedActivityForThisThread(context.timeoutInMillis()); + } } private void contextProcessedSuccessfully(SearchContext context) { diff --git a/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java b/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java index a42ffca50fca0..99d8f91055d97 100644 --- a/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java +++ b/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java @@ -312,9 +312,14 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray entry : fetchResults) { - entry.value.fetchResult().initCounter(); + FetchSearchResult fetchResult=entry.value.fetchResult(); + if(fetchResult.isTimedOut()) + { + timedOut=true; + } + fetchResult.initCounter(); } // merge hits @@ -326,21 +331,23 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray aggregationsList = new ArrayList<>(queryResults.size()); for (AtomicArray.Entry entry : queryResults) { - aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); + if(!entry.value.queryResult().queryResult().searchTimedOut()){ + aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); + } } aggregations = InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService)); } diff --git a/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index e070d212092e7..d998ffc69bdfb 100644 --- a/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -23,9 +23,11 @@ import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.ReaderUtil; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.text.StringAndBytesText; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.concurrent.ActivityTimedOutException; import org.elasticsearch.index.fieldvisitor.*; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.FieldMappers; @@ -80,8 +82,18 @@ public FetchPhase(HighlightPhase highlightPhase, ScriptFieldsFetchSubPhase scrip @Override public void preProcess(SearchContext context) { } - public void execute(SearchContext context) { + try{ + internalExecute(context); + }catch(RuntimeException e){ + if(ExceptionsHelper.wasCausedBy(e,ActivityTimedOutException.class)){ + context.fetchResult().setTimedOut(true); + }else { + throw e; + } + } + } + private void internalExecute(SearchContext context) { FieldsVisitor fieldsVisitor; List extractFieldNames = null; diff --git a/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index 0db1096ad208c..dca4bc46c51e6 100644 --- a/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -23,12 +23,11 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalSearchHits; +import org.elasticsearch.search.internal.InternalSearchHits.StreamContext; import org.elasticsearch.transport.TransportResponse; import java.io.IOException; -import static org.elasticsearch.search.internal.InternalSearchHits.StreamContext; - /** * */ @@ -39,6 +38,7 @@ public class FetchSearchResult extends TransportResponse implements FetchSearchR private InternalSearchHits hits; // client side counter private transient int counter; + private boolean timedOut; public FetchSearchResult() { @@ -94,13 +94,27 @@ public static FetchSearchResult readFetchSearchResult(StreamInput in) throws IOE public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readLong(); - hits = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + timedOut = in.readBoolean(); + if (!timedOut) { + hits = InternalSearchHits.readSearchHits(in, + InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(id); - hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + out.writeBoolean(timedOut); + if (!timedOut) { + hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + } + } + public boolean isTimedOut() { + return timedOut; + } + + public void setTimedOut(boolean timedOut) { + this.timedOut=timedOut; } } diff --git a/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index fd1c1e513621e..ff36cef1dd27b 100644 --- a/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -21,6 +21,7 @@ import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.search.*; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.MinimumScoreCollector; @@ -28,6 +29,7 @@ import org.elasticsearch.common.lucene.search.FilteredCollector; import org.elasticsearch.common.lucene.search.XCollector; import org.elasticsearch.common.lucene.search.XFilteredQuery; +import org.elasticsearch.common.util.concurrent.ActivityTimedOutException; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.search.dfs.CachedDfSource; import org.elasticsearch.search.internal.SearchContext.Lifetime; @@ -167,9 +169,18 @@ public void search(List leaves, Weight weight, Collector co assert terminateAfterSet : "EarlyTerminationException thrown even though terminateAfter wasn't set"; searchContext.queryResult().terminatedEarly(true); } + catch (RuntimeException e) { + if (ExceptionsHelper.wasCausedBy(e, ActivityTimedOutException.class)) { + searchContext.queryResult().searchTimedOut(true); + searchContext.queryResult().topDocs(new TopDocs(0, new ScoreDoc[0], 0f)); + } else { + throw e; + } + } if (terminateAfterSet && searchContext.queryResult().terminatedEarly() == null) { searchContext.queryResult().terminatedEarly(false); } + } else { super.search(leaves, weight, collector); } diff --git a/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/src/main/java/org/elasticsearch/search/internal/SearchContext.java index ac0ab04ed056e..341d4c87c5727 100644 --- a/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.filter.FilterCache; import org.elasticsearch.index.cache.fixedbitset.FixedBitSetFilterCache; @@ -74,14 +75,35 @@ public abstract class SearchContext implements Releasable { private static ThreadLocal current = new ThreadLocal<>(); public final static int DEFAULT_TERMINATE_AFTER = 0; + /** + * Sets the SearchContext for the current thread. Any required timeout settings + * for the SearchContext must be present at this point as this, figuratively speaking, + * is where any countdown clock starts ticking + * + * @param value The new current search context + */ public static void setCurrent(SearchContext value) { current.set(value); QueryParseContext.setTypes(value.types()); } public static void removeCurrent() { + //The Junit test FieldDataTermsFilterTests calls removeCurrent without first calling setCurrent. + //Rather than change that I have chosen to make this method resilient in such cases. + SearchContext currentContext = current.get(); + if(currentContext!=null && currentContext.timeoutInMillis() > 0) + { + // It would have been cleaner and more symmetrical to have the setCurrent/removeCurrent methods + // in this class call the timeout start/stop functions but there are 2 complications: + // 1) The parsing logic means that the setCurrent() method is called before the timeout value + // has actually been parsed. + // 2) It is conceivable that the timeout setting may be adjusted in-flight eg a future admin tool might revise + // the time allowed for a query to run. + ActivityTimeMonitor.stopActivity(); + } current.remove(); QueryParseContext.removeTypes(); + assert ActivityTimeMonitor.getCurrentThreadMonitor() == null || ActivityTimeMonitor.getCurrentThreadMonitor().getStatus() == ActivityTimeMonitor.ActivityStatus.INACTIVE; } public static SearchContext current() { @@ -374,4 +396,16 @@ public enum Lifetime { */ CONTEXT; } + + public void startTimedActivityForThisThread(long timeoutInMillis) { + ActivityTimeMonitor atm = ActivityTimeMonitor.getCurrentThreadMonitor(); + if (atm != null) { + // We had a prior timeout threshold in place that needs revising + atm.stop(); + } + if (timeoutInMillis() > 0) { + ActivityTimeMonitor.startActivity(timeoutInMillis); + } + } + } diff --git a/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 18d49406d0e97..a90a77f7f9f5f 100644 --- a/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -24,9 +24,11 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.concurrent.ActivityTimedOutException; import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.SearchPhase; import org.elasticsearch.search.aggregations.AggregationPhase; @@ -155,14 +157,31 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep } searchContext.queryResult().topDocs(topDocs); } catch (Throwable e) { - throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e); + if(ExceptionsHelper.wasCausedBy(e,ActivityTimedOutException.class)){ + searchContext.queryResult().searchTimedOut(true); + searchContext.queryResult().topDocs(new TopDocs(0, new ScoreDoc[0], 0f)); + }else + { + throw ExceptionsHelper.convertToRuntime(e); + } } finally { searchContext.searcher().finishStage(ContextIndexSearcher.Stage.MAIN_QUERY); } - if (rescore) { // only if we do a regular search - rescorePhase.execute(searchContext); + //We don't want to do any more than necessary if timed out + if(!searchContext.queryResult().searchTimedOut()){ + try{ + if (rescore) { // only if we do a regular search + rescorePhase.execute(searchContext); + } + suggestPhase.execute(searchContext); + aggregationPhase.execute(searchContext); + } catch (RuntimeException rte) { + if (ExceptionsHelper.wasCausedBy(rte, ActivityTimedOutException.class)) { + searchContext.queryResult().searchTimedOut(true); + } else { + throw rte; + } + } } - suggestPhase.execute(searchContext); - aggregationPhase.execute(searchContext); } } diff --git a/src/test/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitorTest.java b/src/test/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitorTest.java new file mode 100644 index 0000000000000..df378d8020998 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitorTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.util.ArrayList; +/** + * Tests that multi-threaded activities with various timeout settings can + * run simultaneously and still trigger timely and appropriate timeout errors. + */ +public class ActivityTimeMonitorTest extends ElasticsearchTestCase{ + + public static final int MAXIMUM_TIMING_DISCREPANCY_MS = 50; + + + @Test + public void testSingleTimeout() throws InterruptedException { + checkTimeoutsFireCorrectly(new TimedActivity(500, 100, 6)); + } + + @Test + public void testSingleNoTimeout() throws InterruptedException { + + checkTimeoutsFireCorrectly(new TimedActivity(500, 10, 5)); + } + + @Test + public void testOneLongNoTimeoutOneShortTimeout() throws InterruptedException { + + TimedActivity goodLong = new TimedActivity(500, 10, 5); + TimedActivity shortBad = new TimedActivity(100, 50, 5); + checkTimeoutsFireCorrectly(goodLong,shortBad); + } + @Test + public void testOneLongNoTimeoutManyShortTimeout() throws InterruptedException { + + ArrayList activities = new ArrayList(); + TimedActivity goodLong = new TimedActivity(500, 10, 5); + int numShorts = 10; + activities.add(goodLong); + for (int i = 0; i < numShorts; i++) { + activities.add(new TimedActivity(100, 50, 5)); + } + checkTimeoutsFireCorrectly((TimedActivity[]) activities.toArray(new TimedActivity[activities.size()])); + } + @Test + public void testOneLongTimeoutManyShortNoTimeout() throws InterruptedException { + ArrayList activities = new ArrayList(); + TimedActivity goodLong = new TimedActivity(500, 5, 120); + int numShorts = 10; + activities.add(goodLong); + for (int i = 0; i < numShorts; i++) { + activities.add(new TimedActivity(100, 5, 5)); + } + checkTimeoutsFireCorrectly((TimedActivity[]) activities.toArray(new TimedActivity[activities.size()])); + } + + + @Test + public void testTimerReset() throws InterruptedException { + ActivityTimeMonitor atm = ActivityTimeMonitor.getOrCreateCurrentThreadMonitor(); + atm.start(500); + Thread.sleep(100); + atm.checkForTimeout(); + atm.stop(); + atm.start(1000); // extend the time allotted to this thread + Thread.sleep(500); + atm.checkForTimeout(); + atm.stop(); + } + + @Test + public void testRandomThreads() throws InterruptedException { + // Tests for a mix of threads - some of which are expected to timeout + // while others not + int numThreads = 100; + int maxTimeToTake = 2000; + int maxNumChecks = 50; + int minTimeBetweenChecks = 5; + int maxTimeBetweenChecks = 50; + + ArrayList activities = new ArrayList(numThreads); + for (int i = 0; i < numThreads; i++) { + int maxTime = (int) (Math.random() * maxTimeToTake); + int numChecks = 1 + (int) (Math.random() * maxNumChecks); + int timeBetweenChecks = minTimeBetweenChecks + (int) (Math.random() * maxTimeBetweenChecks); + activities.add(new TimedActivity(maxTime, timeBetweenChecks, numChecks)); + } + checkTimeoutsFireCorrectly((TimedActivity[]) activities.toArray(new TimedActivity[activities.size()])); + int numTimeouts = 0; + for (TimedActivity timedActivity : activities) { + if (timedActivity.didTimeout) { + numTimeouts++; + } + } + assertTrue("Invalid test parameters - failed to produce a timeout condition", numTimeouts > 0); + assertEquals("All activities should be completed", 0, ActivityTimeMonitor.getCurrentNumActivities()); + } + + public void checkTimeoutsFireCorrectly(TimedActivity... activities ) throws InterruptedException{ + Thread threads[] = new Thread[activities.length]; + for (int i = 0; i < activities.length; i++) { + threads[i] = new Thread(activities[i]); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + for (int i = 0; i < activities.length; i++) { + activities[i].checkAssertions(); + } + } + + + static class TimedActivity implements Runnable{ + int maxTimePermitted; + int checkForTimeoutEvery; + int numberOfChecks; + boolean didTimeout = false; + private long timeTaken; + + public TimedActivity(int maxTimePermitted, int checkForTimeoutEvery, int numberOfChecks) { + super(); + this.maxTimePermitted = maxTimePermitted; + this.checkForTimeoutEvery = checkForTimeoutEvery; + this.numberOfChecks = numberOfChecks; + } + + @Override + public void run() { + long start = System.currentTimeMillis(); + ActivityTimeMonitor.getOrCreateCurrentThreadMonitor().start(maxTimePermitted); + try { + try { + for (int i = 0; i < numberOfChecks; i++) { + Thread.sleep(checkForTimeoutEvery); + ActivityTimeMonitor.getOrCreateCurrentThreadMonitor().checkForTimeout(); + } + } catch (ActivityTimedOutException e) { + didTimeout = true; + } catch (InterruptedException e) { + throw new RuntimeException(); + } + } finally { + ActivityTimeMonitor.getOrCreateCurrentThreadMonitor().stop(); + timeTaken = System.currentTimeMillis() - start; + } + + } + + public void checkAssertions() { + boolean timeoutExpected = timeTaken >= maxTimePermitted; + long overrun = timeTaken - maxTimePermitted; + if ((timeoutExpected) && (!didTimeout)) { + if (overrun <= MAXIMUM_TIMING_DISCREPANCY_MS) { + // allow for minor timing discrepancies (<10ms) in heavily + // multi-threaded tests + // where a thread that overruns marginally escapes the + // scheduled overrun + // detection logic. + return; + } + } + assertEquals("Took " + timeTaken + " but timeout set to " + maxTimePermitted, timeoutExpected, didTimeout); + boolean wasTooSlowReportingOverrun = overrun > (checkForTimeoutEvery + MAXIMUM_TIMING_DISCREPANCY_MS); + assertFalse("Too slow reporting timeout - overrun of " + overrun + " with check every " + checkForTimeoutEvery, + wasTooSlowReportingOverrun); + + } + + } + +} diff --git a/src/test/java/org/elasticsearch/search/query/SimpleQueryTests.java b/src/test/java/org/elasticsearch/search/query/SimpleQueryTests.java index 30038ce18cfcb..9e98f3508e79f 100644 --- a/src/test/java/org/elasticsearch/search/query/SimpleQueryTests.java +++ b/src/test/java/org/elasticsearch/search/query/SimpleQueryTests.java @@ -473,7 +473,7 @@ public void testOmitTermFreqsAndPositions() throws Exception { client().prepareSearch().setQuery(matchQuery("field1", "quick brown").type(MatchQueryBuilder.Type.PHRASE).slop(0)).get(); fail("SearchPhaseExecutionException should have been thrown"); } catch (SearchPhaseExecutionException e) { - assertTrue(e.getMessage().endsWith("IllegalStateException[field \"field1\" was indexed without position data; cannot run PhraseQuery (term=quick)]; }")); + assertTrue(e.getMessage().contains("IllegalStateException[field \"field1\" was indexed without position data; cannot run PhraseQuery (term=quick)]")); } cluster().wipeIndices("test"); } catch (MapperParsingException ex) { diff --git a/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java b/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java index c9abce165fcd0..941bffa3565f5 100644 --- a/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java +++ b/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java @@ -19,16 +19,21 @@ package org.elasticsearch.search.timeout; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.groovy.GroovyScriptEngineService; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.hamcrest.Matchers; import org.junit.Test; import static org.elasticsearch.index.query.FilterBuilders.scriptFilter; -import static org.elasticsearch.index.query.QueryBuilders.filteredQuery; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.query.QueryBuilders.*; import static org.hamcrest.Matchers.equalTo; /** @@ -36,6 +41,19 @@ @ElasticsearchIntegrationTest.ClusterScope(scope=ElasticsearchIntegrationTest.Scope.SUITE) public class SearchTimeoutTests extends ElasticsearchIntegrationTest { + + //Timings in this test rely on having 2 shards + @Override + protected int minimumNumberOfShards() { + return 2; + } + + //Timings in this test rely on having 2 shards + @Override + protected int maximumNumberOfShards() { + return 2; + } + @Override protected Settings nodeSettings(int nodeOrdinal) { return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put(GroovyScriptEngineService.GROOVY_SCRIPT_SANDBOX_ENABLED, false).build(); @@ -51,4 +69,91 @@ public void simpleTimeoutTest() throws Exception { .execute().actionGet(); assertThat(searchResponse.isTimedOut(), equalTo(true)); } + + @Test + public void timedFileAccessTests() throws Exception { + createIndex("test"); + + //Create an index with documents that have test fields that hold delay times required to simulate + // processing the documents' content during a search + int []delays={0,0,0,0,0,10,10,10,10,10,100,100,100,100,100,1000,1000}; + for (int i = 0; i < delays.length; i++) { + client().prepareIndex("test", "type", Integer.toString(i)) + .setSource("delay", delays[i]).execute().actionGet(); + } + client().admin().indices().prepareRefresh().execute().actionGet(); + + //Run a series of searches on docs with different processing delays between doc retrieves + // and expected search timeouts + assertThat(runTimeRestrictedSearch(0,10,1000).isTimedOut(),equalTo(false)); + assertThat(runTimeRestrictedSearch(10,10,1000).isTimedOut(),equalTo(false)); + assertThat(runTimeRestrictedSearch(100,100,200).isTimedOut(),equalTo(true)); + + //Test partial results + //One shard should take <250 ms but the other shard should take >250ms + SearchResponse longResponse = runTimeRestrictedSearch(100,100,250); + assertThat(longResponse.isTimedOut(),equalTo(true)); + SearchHits hits = longResponse.getHits(); + assertTrue("Must have some hits", hits.getTotalHits()>0); + for (SearchHit searchHit : hits) { + Integer delayField = (Integer) searchHit.getSource().get("delay"); + + assertNotNull(delayField); + int delay=delayField.intValue(); + //Must have only retrieved docs before the search timeout threshold + assertThat(delay, Matchers.lessThan(500)); + } + } + @Test + public void timedAggScriptTests() throws Exception { + createIndex("test"); + + // Create an index with documents that have test fields that hold delay times required to simulate + // processing the documents' content during a search + int []delays={50,50,1000,50,50,2000,10,10}; + for (int i = 0; i < delays.length; i++) { + client().prepareIndex("test", "type", Integer.toString(i)) + .setSource("delay", delays[i]).execute().actionGet(); + } + client().admin().indices().prepareRefresh().execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet(); + + //One shard should run 50+50+10ms delay and another 50+1000+50+10ms + SearchResponse oneShardTimedoutResponse = runTimeRestrictedAgg(500, 0, 1000); + assertThat(oneShardTimedoutResponse.isTimedOut(),equalTo(true)); + Aggregations aggs = oneShardTimedoutResponse.getAggregations(); + assertNotNull(aggs); + //One shard should run 50+50+1000+10 delays and another 50+50 + 2000+10 - both > 500ms timeout + SearchResponse bothShardsTimedoutResponse = runTimeRestrictedAgg(500, 0, 2000); + assertThat(bothShardsTimedoutResponse.isTimedOut(),equalTo(true)); + aggs = bothShardsTimedoutResponse.getAggregations(); + assertNull(aggs); + } + + private SearchResponse runTimeRestrictedAgg(int overallTimeout, int fromDelay, int toDelay) { + SearchResponse searchResponse = client().prepareSearch("test") + .setTimeout(overallTimeout+"ms") + .setQuery(rangeQuery("delay").from(fromDelay).to(toDelay)) + //Use _source field in script to force slow evaluations that involve disk access + .addAggregation(new TermsBuilder("delayAgg").script("Integer delay=_source.delay;" + + "Thread.sleep(delay);" + //If needed, explicit checking of timeouts rather than relying on implicit checks on file accesses.. +// + "org.elasticsearch.common.util.concurrent.ActivityTimeMonitor.getCurrentThreadMonitor().checkForTimeout();" + + "return delay")) + .execute().actionGet(); + return searchResponse; + } + + private SearchResponse runTimeRestrictedSearch(int fromDelay, int toDelay, int overallTimeout){ + SearchResponse searchResponse = client().prepareSearch("test") + .setTimeout(overallTimeout+"ms") + .setQuery(filteredQuery(rangeQuery("delay").from(fromDelay).to(toDelay), + //Use _source field in script to force slow evaluations that involve disk access + scriptFilter("Integer delay=_source.delay;" + + "Thread.sleep(delay);" + + "return true;"))) + .execute().actionGet(); + return searchResponse; + } + } From f74acb26db6e2ccd7ad9e92352ba303d76aee1c6 Mon Sep 17 00:00:00 2001 From: markharwood Date: Tue, 26 Aug 2014 11:11:49 +0100 Subject: [PATCH 2/6] Added timeout checks on scripts to handle cases where time is not spent in disk accesses --- .../elasticsearch/script/ScriptService.java | 96 ++++++++++++++++++- 1 file changed, 94 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/elasticsearch/script/ScriptService.java b/src/main/java/org/elasticsearch/script/ScriptService.java index b35b85e203357..fd886b7bb8a5a 100644 --- a/src/main/java/org/elasticsearch/script/ScriptService.java +++ b/src/main/java/org/elasticsearch/script/ScriptService.java @@ -23,6 +23,8 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableMap; +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.search.Scorer; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.ActionListener; @@ -48,6 +50,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -455,11 +458,100 @@ public ExecutableScript executable(String lang, String script, ScriptType script } public ExecutableScript executable(CompiledScript compiledScript, Map vars) { - return scriptEngines.get(compiledScript.lang()).executable(compiledScript.compiled(), vars); + ActivityTimeMonitor atm = ActivityTimeMonitor.getCurrentThreadMonitor(); + ExecutableScript result = scriptEngines.get(compiledScript.lang()).executable(compiledScript.compiled(), vars); + if (atm != null) { + // Add a wrapper that checks for timeout + result = new TimeRestrictedExecutableScript(result, atm); + } + return result; } public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map vars) { - return scriptEngines.get(compiledScript.lang()).search(compiledScript.compiled(), lookup, vars); + ActivityTimeMonitor atm = ActivityTimeMonitor.getCurrentThreadMonitor(); + SearchScript result = scriptEngines.get(compiledScript.lang()).search(compiledScript.compiled(), lookup, vars); + if (atm != null) { + // Add a wrapper that checks for timeout + result = new TimeRestrictedSearchScript(result, atm); + } + return result; + } + private static class TimeRestrictedExecutableScript implements ExecutableScript{ + + private final ExecutableScript script; + ActivityTimeMonitor atm; + + public TimeRestrictedExecutableScript(ExecutableScript script, ActivityTimeMonitor atm) { + this.script = script; + this.atm = atm; + } + + public void setNextVar(String name, Object value) { + script.setNextVar(name, value); + } + + public Object run() { + if (atm != null) { + atm.checkForTimeout(); + } + return script.run(); + } + + public Object unwrap(Object value) { + return script.unwrap(value); + } + } + + private static final class TimeRestrictedSearchScript extends TimeRestrictedExecutableScript implements SearchScript{ + + private final SearchScript script; + + public TimeRestrictedSearchScript(SearchScript script, ActivityTimeMonitor atm) { + super(script, atm); + this.script = script; + } + + public void setScorer(Scorer scorer) { + script.setScorer(scorer); + } + + public void setNextReader(AtomicReaderContext reader) { + script.setNextReader(reader); + // The test framework revealed that it is possible for SearchScript objects + // to be created in one thread but used by another so we need to reset the + // ActivityTimeMonitor instance to the correct one associated with this thread + atm = ActivityTimeMonitor.getCurrentThreadMonitor(); + } + + + public void setNextDocId(int doc) { + script.setNextDocId(doc); + } + + public void setNextSource(Map source) { + script.setNextSource(source); + } + + public float runAsFloat() { + if (atm != null) { + atm.checkForTimeout(); + } + return script.runAsFloat(); + } + + public long runAsLong() { + if (atm != null) { + atm.checkForTimeout(); + } + return script.runAsLong(); + } + + public double runAsDouble() { + if (atm != null) { + atm.checkForTimeout(); + } + return script.runAsDouble(); + } } public SearchScript search(SearchLookup lookup, String lang, String script, ScriptType scriptType, @Nullable Map vars) { From e8912d391af498f0ad605bae878254a69cbc6ead Mon Sep 17 00:00:00 2001 From: markharwood Date: Wed, 27 Aug 2014 11:15:36 +0100 Subject: [PATCH 3/6] Added support for RandomAccessInput used in accesses like DocValues-backed aggs --- .../org/elasticsearch/index/store/Store.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 10826b05c9aa7..eb09f1a00f285 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -520,8 +520,52 @@ public int hashCode() { public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { return new TimeLimitedIndexInput(sliceDescription, in.slice(sliceDescription, offset, length), this.activityState); } + + @Override + public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException { + // We may be being called by a thread subject to a timeout-constraint. + // Unlike other disk accesses where the "root" IndexInput is cloned before use + // (where we create the activityState instance in clones) the randomAccessSlice calls + // are made to the root IndexInput instance which has null for an activityState so we + // need an extra call here to establish if the calling thread is timed or not. + ActivityTimeMonitor threadActivityState = ActivityTimeMonitor.getCurrentThreadMonitor(); + RandomAccessInput result = in.randomAccessSlice(offset, length); + if (threadActivityState != null) { + result = new TimeLimitedRandomAccessInput(result, threadActivityState); + } + return result; + } } + class TimeLimitedRandomAccessInput implements RandomAccessInput { + private RandomAccessInput slice; + private ActivityTimeMonitor activityState; + + public TimeLimitedRandomAccessInput(RandomAccessInput slice, ActivityTimeMonitor activityState) { + this.slice = slice; + this.activityState = activityState; + } + + public byte readByte(long pos) throws IOException { + activityState.checkForTimeout(); + return slice.readByte(pos); + } + + public short readShort(long pos) throws IOException { + activityState.checkForTimeout(); + return slice.readShort(pos); + } + + public int readInt(long pos) throws IOException { + activityState.checkForTimeout(); + return slice.readInt(pos); + } + + public long readLong(long pos) throws IOException { + activityState.checkForTimeout(); + return slice.readLong(pos); + } + } @Override From bafa34f877a525326ca6e5d5f6cf74efa6d32b7c Mon Sep 17 00:00:00 2001 From: markharwood Date: Fri, 26 Sep 2014 15:56:39 +0100 Subject: [PATCH 4/6] =?UTF-8?q?Added=20settings=20that=20only=20wrap=20Luc?= =?UTF-8?q?ene=20data=20structures=20if=20an=20index=20setting=20requests?= =?UTF-8?q?=20=E2=80=9Cthorough=E2=80=9D=20timeout=20checking.=20Avoids=20?= =?UTF-8?q?paying=20the=20performance=20penalty=20of=20timeout=20checks=20?= =?UTF-8?q?if=20not=20required?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cluster/metadata/IndexMetaData.java | 1 + .../org/elasticsearch/index/store/Store.java | 8 +++++-- .../search/timeout/SearchTimeoutTests.java | 24 ++++++++++++------- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index b6d5d66139281..56f765ea9b0f0 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -154,6 +154,7 @@ public static State fromString(String state) { public static final String SETTING_NUMBER_OF_SHARDS = "index.number_of_shards"; public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas"; + public static final String SETTING_THOROUGH_TIMEOUT_CHECKS = "index.thorough_timeout_checks"; public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final String SETTING_READ_ONLY = "index.blocks.read_only"; public static final String SETTING_BLOCKS_READ = "index.blocks.read"; diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index eb09f1a00f285..647217107ade3 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -32,6 +32,7 @@ import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.Version; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.Compressor; @@ -93,6 +94,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex private final DirectoryService directoryService; private final StoreDirectory directory; private final DistributorDirectory distributorDirectory; + private boolean enabledThoroughTimeouts; @Inject public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecService codecService, DirectoryService directoryService, Distributor distributor) throws IOException { @@ -100,6 +102,7 @@ public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecServic this.codecService = codecService; this.directoryService = directoryService; this.distributorDirectory = new DistributorDirectory(distributor); + this.enabledThoroughTimeouts = indexSettings.getAsBoolean(IndexMetaData.SETTING_THOROUGH_TIMEOUT_CHECKS, false); this.directory = new StoreDirectory(distributorDirectory); } @@ -425,8 +428,9 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } // Merges and other non-search operations are not // subject to timeout constraints. Only add timeout - // capable wrappers for search operations. - if (context.context == Context.READ) { + // capable wrappers for search operations and where + // index settings explicitly request this feature + if(enabledThoroughTimeouts && (context.context == Context.READ)) { in = new TimeLimitedIndexInput(name, in, null); } return in; diff --git a/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java b/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java index 941bffa3565f5..f4e296256a1b0 100644 --- a/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java +++ b/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.groovy.GroovyScriptEngineService; @@ -32,6 +33,8 @@ import org.hamcrest.Matchers; import org.junit.Test; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.index.query.FilterBuilders.scriptFilter; import static org.elasticsearch.index.query.QueryBuilders.*; import static org.hamcrest.Matchers.equalTo; @@ -41,17 +44,20 @@ @ElasticsearchIntegrationTest.ClusterScope(scope=ElasticsearchIntegrationTest.Scope.SUITE) public class SearchTimeoutTests extends ElasticsearchIntegrationTest { - - //Timings in this test rely on having 2 shards - @Override - protected int minimumNumberOfShards() { - return 2; - } - //Timings in this test rely on having 2 shards @Override - protected int maximumNumberOfShards() { - return 2; + public Settings indexSettings() { + ImmutableSettings.Builder builder = ImmutableSettings.builder(); + //Timings in this test rely on having 2 shards + builder.put(SETTING_NUMBER_OF_SHARDS, 2).build(); + builder.put(IndexMetaData.SETTING_THOROUGH_TIMEOUT_CHECKS, true).build(); + if (randomizeNumberOfShardsAndReplicas()) { + int numberOfReplicas = numberOfReplicas(); + if (numberOfReplicas >= 0) { + builder.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build(); + } + } + return builder.build(); } @Override From 30aca173047e30e7a6be6ecdd9b746b4975c9142 Mon Sep 17 00:00:00 2001 From: markharwood Date: Fri, 26 Sep 2014 17:16:00 +0100 Subject: [PATCH 5/6] Changes to make background monitor thread play nicely with test framework --- .../common/util/concurrent/ActivityTimeMonitor.java | 10 ++++++++-- .../elasticsearch/test/ElasticsearchThreadFilter.java | 5 +++-- .../threadpool/SimpleThreadPoolTests.java | 10 +++++----- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java b/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java index 0c98452cb6f9f..4311a436ac4d2 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java @@ -44,6 +44,8 @@ */ public class ActivityTimeMonitor implements Delayed { + public static final String TIMEOUT_MONITOR_THREADNAME = "ActivityTimeoutMonitor"; + public enum ActivityStatus { INACTIVE, ACTIVE, OVERRUNNING } @@ -149,7 +151,6 @@ private boolean assertThread() { static { timeoutMonitorThread = new TimeoutMonitorThread(); - timeoutMonitorThread.setDaemon(true); timeoutMonitorThread.start(); } @@ -229,6 +230,11 @@ public static ActivityTimeMonitor stopActivity() { private static final class TimeoutMonitorThread extends Thread { // A queue of Thread statuses sorted by their scheduled timeout time. DelayQueue inboundMessageQueue = new DelayQueue(); + + TimeoutMonitorThread(){ + super(TIMEOUT_MONITOR_THREADNAME); + setDaemon(true); + } @Override public void run() { @@ -238,7 +244,7 @@ public void run() { // timeout then set the volatile boolean indicating an error inboundMessageQueue.take().moveToOverrun(); } catch (InterruptedException e1) { - // Need to keep on trucking + // Need to keep on trucking } } } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchThreadFilter.java b/src/test/java/org/elasticsearch/test/ElasticsearchThreadFilter.java index 7078f84519676..9114627d173ab 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchThreadFilter.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchThreadFilter.java @@ -21,7 +21,7 @@ import com.carrotsearch.randomizedtesting.ThreadFilter; import org.elasticsearch.common.network.MulticastChannel; -import org.elasticsearch.test.hamcrest.RegexMatcher; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.tribe.TribeTests; import java.util.regex.Pattern; @@ -49,7 +49,8 @@ public boolean reject(Thread t) { if (threadName.contains("[" + MulticastChannel.SHARED_CHANNEL_NAME + "]") || threadName.contains("[" + ElasticsearchSingleNodeTest.nodeName() + "]") - || threadName.contains("Keep-Alive-Timer")) { + || threadName.contains("Keep-Alive-Timer") + || threadName.contains(ActivityTimeMonitor.TIMEOUT_MONITOR_THREADNAME)) { return true; } return nodePrefix.matcher(t.getName()).find(); diff --git a/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java b/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java index 28cd227e0abff..b229788bf34ea 100644 --- a/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java +++ b/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java @@ -26,16 +26,16 @@ import org.elasticsearch.common.network.MulticastChannel; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.*; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.ElasticsearchSingleNodeTest; -import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import org.elasticsearch.test.hamcrest.RegexMatcher; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.tribe.TribeTests; @@ -52,7 +52,6 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.*; @@ -114,7 +113,8 @@ public void verifyThreadNames() throws Exception { // or the ones that are occasionally come up from ElasticsearchSingleNodeTest if (threadName.contains("[" + MulticastChannel.SHARED_CHANNEL_NAME + "]") || threadName.contains("[" + ElasticsearchSingleNodeTest.nodeName() + "]") - || threadName.contains("Keep-Alive-Timer")) { + || threadName.contains("Keep-Alive-Timer") + || threadName.contains(ActivityTimeMonitor.TIMEOUT_MONITOR_THREADNAME)) { continue; } String nodePrefix = "(" + Pattern.quote(InternalTestCluster.TRANSPORT_CLIENT_PREFIX) + ")?(" + From bae1a7e7eb0e61ddab13fca09d89597f3699c807 Mon Sep 17 00:00:00 2001 From: markharwood Date: Fri, 26 Sep 2014 18:04:47 +0100 Subject: [PATCH 6/6] =?UTF-8?q?Added=20timeout=20check=20in=20slow=20secti?= =?UTF-8?q?on=20that=20doesn=E2=80=99t=20appear=20to=20hit=20disk=20so=20d?= =?UTF-8?q?oesn=E2=80=99t=20trigger=20the=20Lucene=20disk=20accesses=20tha?= =?UTF-8?q?t=20contain=20timeout=20checks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../aggregations/bucket/terms/support/IncludeExclude.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java index 86c74e89302c3..d8143df4e6f82 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -173,6 +174,7 @@ public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals, Values } } else { if(hasRegexTest) { + ActivityTimeMonitor atm = ActivityTimeMonitor.getCurrentThreadMonitor(); // We have includeVals that are a regex or only regex excludes - we need to do the potentially // slow option of hitting termsEnum for every term in the index. TermsEnum globalTermsEnum = valueSource.globalOrdinalsValues().termsEnum(); @@ -181,6 +183,9 @@ public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals, Values if (accept(term)) { acceptedGlobalOrdinals.set(globalTermsEnum.ord()); } + if (atm != null) { + atm.checkForTimeout(); + } } } catch (IOException e) { throw ExceptionsHelper.convertToElastic(e);