diff --git a/src/main/java/org/elasticsearch/ExceptionsHelper.java b/src/main/java/org/elasticsearch/ExceptionsHelper.java index 7a2c4e91f22b8..36d1c9a67e0d3 100644 --- a/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -74,6 +74,39 @@ public static Throwable unwrapCause(Throwable t) { } return result; } + + + + /** + * Determines if a given exception has a particular type of exception + * as a cause. + * @param e The exception received + * @param exType The type of exception that is being looked for as a cause + * @return true if the exception or any of the nested exceptions revealed by + * getCause() are of the given exType. + */ + public static boolean wasCausedBy(Throwable e, Class exType) { + if (exType == null) { + return false; + } + if (exType.isInstance(e)) { + return true; + } + Throwable cause = e.getCause(); + if (cause == e) { + return false; + } + while (cause != null) { + if (exType.isInstance(cause)) { + return true; + } + if (cause.getCause() == cause) { + break; + } + cause = cause.getCause(); + } + return false; + } public static String detailedMessage(Throwable t) { return detailedMessage(t, false, 0); @@ -184,4 +217,31 @@ public static boolean isOOM(Throwable t) { ) ); } + + /** + * Extracts ElasticsearchException causes from non-serializable exceptions e.g. + * those produced by script. + * @param e The exception received + * @return null or the top-most exception revealed by + * getCause() which is of the type ElasticsearchException + */ + public static ElasticsearchException extractElasticsearchCause(Throwable e) { + if (e instanceof ElasticsearchException) { + return (ElasticsearchException) e; + } + Throwable cause = e.getCause(); + if (cause == e) { + return null; + } + while (cause != null) { + if (cause instanceof ElasticsearchException) { + return (ElasticsearchException) cause; + } + if (cause.getCause() == cause) { + break; + } + cause = cause.getCause(); + } + return null; + } } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index b6d5d66139281..56f765ea9b0f0 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -154,6 +154,7 @@ public static State fromString(String state) { public static final String SETTING_NUMBER_OF_SHARDS = "index.number_of_shards"; public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas"; + public static final String SETTING_THOROUGH_TIMEOUT_CHECKS = "index.thorough_timeout_checks"; public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final String SETTING_READ_ONLY = "index.blocks.read_only"; public static final String SETTING_BLOCKS_READ = "index.blocks.read"; diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java b/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java new file mode 100644 index 0000000000000..4311a436ac4d2 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitor.java @@ -0,0 +1,268 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * Utility class which efficiently monitors time-limited activities. When + * start() is called on an ActivityTimeMonitor with a maximum length of time, + * the status will automatically be updated to OVERRUNNING after the allotted + * time has expired. If a call is made to checkForTimeout() when the status is + * OVERRUNING this object will throw a {@link ActivityTimedOutException} Calling + * stop() will reset the monitor status to inactive. It is only permitted + * to call start() when inactive so in order to establish a new deadline for an active + * activity you will need to call stop() first then cal start() with the new deadline. + * When start() is called the ActivityTimeMonitor is bound to the current thread via + * a ThreadLocal and calling getCurrentThreadMonitor() will return the instance + * associated with the current thread. + * + * A background thread is used to handle all instances and set the OVERRUNING status + * at the appropriate points + */ +public class ActivityTimeMonitor implements Delayed { + + public static final String TIMEOUT_MONITOR_THREADNAME = "ActivityTimeoutMonitor"; + + public enum ActivityStatus { + INACTIVE, ACTIVE, OVERRUNNING + } + private final AtomicReferenceFieldUpdater STATUS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ActivityTimeMonitor.class, ActivityStatus.class, "status"); + private volatile ActivityStatus status = ActivityStatus.INACTIVE; + private long scheduledTimeoutInMilliseconds = Long.MIN_VALUE; + private final Thread associatedThread; + + private ActivityTimeMonitor() { + associatedThread = Thread.currentThread(); + } + + public boolean moveToInactive() { + do { + ActivityStatus s = status; + switch (s) { + case INACTIVE: + return false; + case OVERRUNNING: + case ACTIVE: + if (STATUS_UPDATER.compareAndSet(this, s, ActivityStatus.INACTIVE)) { + return true; + } + default: + assert false; + } + } while (true); + } + + public void moveToActive() { + do { + ActivityStatus s = status; + switch (s) { + case ACTIVE: + throw new ElasticsearchIllegalArgumentException("Can't move to active - already active"); + case OVERRUNNING: + assert false; + case INACTIVE: + if (STATUS_UPDATER.compareAndSet(this, s, ActivityStatus.ACTIVE)) { + return; + } + default: + assert false; + } + } while (true); + } + + public void moveToOverrun() { + do { + ActivityStatus s = status; + switch (s) { + case OVERRUNNING: + assert false; + case INACTIVE: + return; + case ACTIVE: + if (STATUS_UPDATER.compareAndSet(this, s, ActivityStatus.OVERRUNNING)) { + return; + } + default: + assert false; + } + } while (true); + } + + public ActivityStatus getStatus() { + return status; + } + + public void start(long maxTime) { + assert status == ActivityStatus.INACTIVE; + this.scheduledTimeoutInMilliseconds = System.currentTimeMillis() + maxTime; + moveToActive(); + timeoutMonitorThread.add(this); + timeoutStateTL.set(this); + } + + public void stop() { + if (moveToInactive()) { + // need to remove the old timeout status from the monitor thread - + // this will also set status to INACTIVE + timeoutMonitorThread.remove(this); + } + } + + public void checkForTimeout() { + assert assertThread(); + if (status == ActivityStatus.OVERRUNNING) { + throw new ActivityTimedOutException("Timed out thread " + Thread.currentThread().getName(), getDelay(TimeUnit.MILLISECONDS)); + } + + } + + private boolean assertThread() { + assert associatedThread == Thread.currentThread() : "ActivityTimeMonitor is not threadsafe - can't be shared across threads"; + return true; + } + + // an internal thread that monitors timeout activity + private static final TimeoutMonitorThread timeoutMonitorThread; + // Thread-Local holding the current timeout state associated with threads + private static final ThreadLocal timeoutStateTL = new ThreadLocal(); + + static { + timeoutMonitorThread = new TimeoutMonitorThread(); + timeoutMonitorThread.start(); + } + + /** + * + * @return the number of timed activities currently being monitored + */ + public static int getCurrentNumActivities() { + return timeoutMonitorThread.inboundMessageQueue.size(); + } + + /** + * Get the timeout status information for the current thread + * + * @return null or the current ActivityTimeMonitor for the current thread + */ + public static ActivityTimeMonitor getCurrentThreadMonitor() { + return timeoutStateTL.get(); + } + + /** + * Get the timeout status information for the current thread or create if it does not exist + * + * @return the current thread's ActivityTimeMonitor + */ + public static ActivityTimeMonitor getOrCreateCurrentThreadMonitor() { + + ActivityTimeMonitor result = timeoutStateTL.get(); + if(result==null){ + result=new ActivityTimeMonitor(); + timeoutStateTL.set(result); + } + assert result.assertThread(); + return result; + } + + @Override + public int compareTo(Delayed o) { + ActivityTimeMonitor other = (ActivityTimeMonitor) o; + return (int) (scheduledTimeoutInMilliseconds - other.scheduledTimeoutInMilliseconds); + } + + @Override + public long getDelay(TimeUnit unit) { + long now = System.currentTimeMillis(); + return unit.convert(scheduledTimeoutInMilliseconds - now, TimeUnit.MILLISECONDS); + } + + /** + * Helper method to start a new activity + * @param maxTimeInMilliseconds the maximum length of time allowed for an activity + * @return the existing monitor for the current thread or, a new one if none existed before + */ + public static ActivityTimeMonitor startActivity(long maxTimeInMilliseconds) { + ActivityTimeMonitor result = getCurrentThreadMonitor(); + if (result == null) { + result = new ActivityTimeMonitor(); + } + result.start(maxTimeInMilliseconds); + return result; + } + + /** + * Helper method called to mark the stopping of a timed activity associated with the current thread + */ + public static ActivityTimeMonitor stopActivity() { + ActivityTimeMonitor result = getCurrentThreadMonitor(); + if (result != null) { + result.stop(); + } + return result; + } + + + // A background thread used to set the timed-out status for threads when + // they occur + private static final class TimeoutMonitorThread extends Thread { + // A queue of Thread statuses sorted by their scheduled timeout time. + DelayQueue inboundMessageQueue = new DelayQueue(); + + TimeoutMonitorThread(){ + super(TIMEOUT_MONITOR_THREADNAME); + setDaemon(true); + } + + @Override + public void run() { + while (true) { + try { + // Block until the first ActivityTimeMonitor has reached its + // timeout then set the volatile boolean indicating an error + inboundMessageQueue.take().moveToOverrun(); + } catch (InterruptedException e1) { + // Need to keep on trucking + } + } + } + + void remove(ActivityTimeMonitor state) { + assert state.getStatus() == ActivityStatus.INACTIVE; + inboundMessageQueue.remove(state); + } + + void add(ActivityTimeMonitor state) { + assert state.getStatus() == ActivityStatus.ACTIVE; + if (state.scheduledTimeoutInMilliseconds <= System.currentTimeMillis()) { + state.moveToOverrun(); + } else { + inboundMessageQueue.add(state); + } + } + } + + +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimedOutException.java b/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimedOutException.java new file mode 100644 index 0000000000000..39ac6f0c834ae --- /dev/null +++ b/src/main/java/org/elasticsearch/common/util/concurrent/ActivityTimedOutException.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.ElasticsearchException; + +public class ActivityTimedOutException extends ElasticsearchException +{ + + private long overrunMilliseconds; + + public ActivityTimedOutException(String msg, long overrun) + { + super(msg); + this.overrunMilliseconds=overrun; + } + + public long getOverrunMilliseconds() + { + return overrunMilliseconds; + } + +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 5095c477555fd..647217107ade3 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -26,11 +26,13 @@ import org.apache.lucene.codecs.lucene46.Lucene46SegmentInfoFormat; import org.apache.lucene.index.*; import org.apache.lucene.store.*; +import org.apache.lucene.store.IOContext.Context; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.Version; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.Compressor; @@ -42,6 +44,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.index.CloseableIndexComponent; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.settings.IndexSettings; @@ -91,6 +94,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex private final DirectoryService directoryService; private final StoreDirectory directory; private final DistributorDirectory distributorDirectory; + private boolean enabledThoroughTimeouts; @Inject public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecService codecService, DirectoryService directoryService, Distributor distributor) throws IOException { @@ -98,6 +102,7 @@ public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecServic this.codecService = codecService; this.directoryService = directoryService; this.distributorDirectory = new DistributorDirectory(distributor); + this.enabledThoroughTimeouts = indexSettings.getAsBoolean(IndexMetaData.SETTING_THOROUGH_TIMEOUT_CHECKS, false); this.directory = new StoreDirectory(distributorDirectory); } @@ -421,9 +426,152 @@ public IndexInput openInput(String name, IOContext context) throws IOException { IOUtils.closeWhileHandlingException(in); } } + // Merges and other non-search operations are not + // subject to timeout constraints. Only add timeout + // capable wrappers for search operations and where + // index settings explicitly request this feature + if(enabledThoroughTimeouts && (context.context == Context.READ)) { + in = new TimeLimitedIndexInput(name, in, null); + } return in; } + class TimeLimitedIndexInput extends IndexInput { + + private final IndexInput in; + private final String name; + private final ActivityTimeMonitor activityState; + + public TimeLimitedIndexInput(String resourceDescription, IndexInput in, ActivityTimeMonitor monitor) { + super(resourceDescription); + this.in = in; + this.activityState = monitor; + this.name = resourceDescription; + } + + @Override + public void close() throws IOException { + try { + if (activityState != null) { + activityState.checkForTimeout(); + } + } finally { + in.close(); + } + } + + @Override + public long getFilePointer() { + if (activityState != null) { + activityState.checkForTimeout(); + } + return in.getFilePointer(); + } + + @Override + public void seek(long pos) throws IOException { + if (activityState != null) { + activityState.checkForTimeout(); + } + in.seek(pos); + } + + @Override + public long length() { + if (activityState != null) { + activityState.checkForTimeout(); + } + return in.length(); + } + + @Override + public byte readByte() throws IOException { + if (activityState != null) { + activityState.checkForTimeout(); + } + return in.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + if (activityState != null) { + activityState.checkForTimeout(); + } + in.readBytes(b, offset, len); + } + + @Override + public IndexInput clone() { + // because of reuse of IndexInput clones in search thread pools + // we may alternate between timed and untimed search operations + // and we cannot pre-empt here how this IndexInput might be used. + // So always need to assume the worst and wrap IndexInputs with + // timeout monitoring capabilities. + return new TimeLimitedIndexInput(name, in.clone(), ActivityTimeMonitor.getOrCreateCurrentThreadMonitor()); + } + + @Override + public boolean equals(Object o) { + return in.equals(o); + } + + @Override + public int hashCode() { + return in.hashCode(); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + return new TimeLimitedIndexInput(sliceDescription, in.slice(sliceDescription, offset, length), this.activityState); + } + + @Override + public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException { + // We may be being called by a thread subject to a timeout-constraint. + // Unlike other disk accesses where the "root" IndexInput is cloned before use + // (where we create the activityState instance in clones) the randomAccessSlice calls + // are made to the root IndexInput instance which has null for an activityState so we + // need an extra call here to establish if the calling thread is timed or not. + ActivityTimeMonitor threadActivityState = ActivityTimeMonitor.getCurrentThreadMonitor(); + RandomAccessInput result = in.randomAccessSlice(offset, length); + if (threadActivityState != null) { + result = new TimeLimitedRandomAccessInput(result, threadActivityState); + } + return result; + } + } + + class TimeLimitedRandomAccessInput implements RandomAccessInput { + private RandomAccessInput slice; + private ActivityTimeMonitor activityState; + + public TimeLimitedRandomAccessInput(RandomAccessInput slice, ActivityTimeMonitor activityState) { + this.slice = slice; + this.activityState = activityState; + } + + public byte readByte(long pos) throws IOException { + activityState.checkForTimeout(); + return slice.readByte(pos); + } + + public short readShort(long pos) throws IOException { + activityState.checkForTimeout(); + return slice.readShort(pos); + } + + public int readInt(long pos) throws IOException { + activityState.checkForTimeout(); + return slice.readInt(pos); + } + + public long readLong(long pos) throws IOException { + activityState.checkForTimeout(); + return slice.readLong(pos); + } + } + + @Override public void close() throws IOException { assert false : "Nobody should close this directory except of the Store itself"; diff --git a/src/main/java/org/elasticsearch/script/ScriptService.java b/src/main/java/org/elasticsearch/script/ScriptService.java index b35b85e203357..fd886b7bb8a5a 100644 --- a/src/main/java/org/elasticsearch/script/ScriptService.java +++ b/src/main/java/org/elasticsearch/script/ScriptService.java @@ -23,6 +23,8 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableMap; +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.search.Scorer; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.ActionListener; @@ -48,6 +50,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -455,11 +458,100 @@ public ExecutableScript executable(String lang, String script, ScriptType script } public ExecutableScript executable(CompiledScript compiledScript, Map vars) { - return scriptEngines.get(compiledScript.lang()).executable(compiledScript.compiled(), vars); + ActivityTimeMonitor atm = ActivityTimeMonitor.getCurrentThreadMonitor(); + ExecutableScript result = scriptEngines.get(compiledScript.lang()).executable(compiledScript.compiled(), vars); + if (atm != null) { + // Add a wrapper that checks for timeout + result = new TimeRestrictedExecutableScript(result, atm); + } + return result; } public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map vars) { - return scriptEngines.get(compiledScript.lang()).search(compiledScript.compiled(), lookup, vars); + ActivityTimeMonitor atm = ActivityTimeMonitor.getCurrentThreadMonitor(); + SearchScript result = scriptEngines.get(compiledScript.lang()).search(compiledScript.compiled(), lookup, vars); + if (atm != null) { + // Add a wrapper that checks for timeout + result = new TimeRestrictedSearchScript(result, atm); + } + return result; + } + private static class TimeRestrictedExecutableScript implements ExecutableScript{ + + private final ExecutableScript script; + ActivityTimeMonitor atm; + + public TimeRestrictedExecutableScript(ExecutableScript script, ActivityTimeMonitor atm) { + this.script = script; + this.atm = atm; + } + + public void setNextVar(String name, Object value) { + script.setNextVar(name, value); + } + + public Object run() { + if (atm != null) { + atm.checkForTimeout(); + } + return script.run(); + } + + public Object unwrap(Object value) { + return script.unwrap(value); + } + } + + private static final class TimeRestrictedSearchScript extends TimeRestrictedExecutableScript implements SearchScript{ + + private final SearchScript script; + + public TimeRestrictedSearchScript(SearchScript script, ActivityTimeMonitor atm) { + super(script, atm); + this.script = script; + } + + public void setScorer(Scorer scorer) { + script.setScorer(scorer); + } + + public void setNextReader(AtomicReaderContext reader) { + script.setNextReader(reader); + // The test framework revealed that it is possible for SearchScript objects + // to be created in one thread but used by another so we need to reset the + // ActivityTimeMonitor instance to the correct one associated with this thread + atm = ActivityTimeMonitor.getCurrentThreadMonitor(); + } + + + public void setNextDocId(int doc) { + script.setNextDocId(doc); + } + + public void setNextSource(Map source) { + script.setNextSource(source); + } + + public float runAsFloat() { + if (atm != null) { + atm.checkForTimeout(); + } + return script.runAsFloat(); + } + + public long runAsLong() { + if (atm != null) { + atm.checkForTimeout(); + } + return script.runAsLong(); + } + + public double runAsDouble() { + if (atm != null) { + atm.checkForTimeout(); + } + return script.runAsDouble(); + } } public SearchScript search(SearchLookup lookup, String lang, String script, ScriptType scriptType, @Nullable Map vars) { diff --git a/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java b/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java index 299d90e71a9c6..591f812aa0981 100644 --- a/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java +++ b/src/main/java/org/elasticsearch/script/groovy/GroovyScriptEngineService.java @@ -241,7 +241,10 @@ public Object run() { if (logger.isTraceEnabled()) { logger.trace("exception running Groovy script", e); } - throw new GroovyScriptExecutionException(ExceptionsHelper.detailedMessage(e)); + // Cause needs to be discovered by peeling away layers of non-serializable classes introduced by + // dynamic Groovy classes - was getting an "unrecognised class" error on the reducer node when deserializing + // unknown "cause" exceptions. + throw new GroovyScriptExecutionException(ExceptionsHelper.detailedMessage(e), ExceptionsHelper.extractElasticsearchCause(e)); } } diff --git a/src/main/java/org/elasticsearch/search/SearchService.java b/src/main/java/org/elasticsearch/search/SearchService.java index 03cbb7995d471..1616d7e0b6660 100644 --- a/src/main/java/org/elasticsearch/search/SearchService.java +++ b/src/main/java/org/elasticsearch/search/SearchService.java @@ -134,6 +134,8 @@ public class SearchService extends AbstractLifecycleComponent { private final ImmutableMap elementParsers; + private final long defaultSearchTimeout; + @Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, IndicesWarmer indicesWarmer, ThreadPool threadPool, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, @@ -150,6 +152,11 @@ public SearchService(Settings settings, ClusterService clusterService, IndicesSe this.queryPhase = queryPhase; this.fetchPhase = fetchPhase; this.indicesQueryCache = indicesQueryCache; + /* + * TODO: This default timeout here should be used if no timeout on the request is set - we can + * randomly set this to high values to exercise it? + */ + this.defaultSearchTimeout = componentSettings.getAsTime("default_timeout", timeValueMinutes(1)).millis(); TimeValue keepAliveInterval = componentSettings.getAsTime(KEEPALIVE_INTERVAL_COMPONENENT_KEY, timeValueMinutes(1)); // we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes @@ -582,8 +589,14 @@ public void freeAllScrollContexts() { } private void contextProcessing(SearchContext context) { - // disable timeout while executing a search + // disable context tidy-ups while executing a search context.accessed(-1); + // start any timer required for this thread + if(context.timeoutInMillis()>0){ + // TODO could adjust remaining time spent executing this phase + // based on time already spent on prior phases + context.startTimedActivityForThisThread(context.timeoutInMillis()); + } } private void contextProcessedSuccessfully(SearchContext context) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java index 86c74e89302c3..d8143df4e6f82 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -173,6 +174,7 @@ public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals, Values } } else { if(hasRegexTest) { + ActivityTimeMonitor atm = ActivityTimeMonitor.getCurrentThreadMonitor(); // We have includeVals that are a regex or only regex excludes - we need to do the potentially // slow option of hitting termsEnum for every term in the index. TermsEnum globalTermsEnum = valueSource.globalOrdinalsValues().termsEnum(); @@ -181,6 +183,9 @@ public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals, Values if (accept(term)) { acceptedGlobalOrdinals.set(globalTermsEnum.ord()); } + if (atm != null) { + atm.checkForTimeout(); + } } } catch (IOException e) { throw ExceptionsHelper.convertToElastic(e); diff --git a/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java b/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java index a42ffca50fca0..99d8f91055d97 100644 --- a/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java +++ b/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java @@ -312,9 +312,14 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray entry : fetchResults) { - entry.value.fetchResult().initCounter(); + FetchSearchResult fetchResult=entry.value.fetchResult(); + if(fetchResult.isTimedOut()) + { + timedOut=true; + } + fetchResult.initCounter(); } // merge hits @@ -326,21 +331,23 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray aggregationsList = new ArrayList<>(queryResults.size()); for (AtomicArray.Entry entry : queryResults) { - aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); + if(!entry.value.queryResult().queryResult().searchTimedOut()){ + aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); + } } aggregations = InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService)); } diff --git a/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index e070d212092e7..d998ffc69bdfb 100644 --- a/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -23,9 +23,11 @@ import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.ReaderUtil; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.text.StringAndBytesText; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.concurrent.ActivityTimedOutException; import org.elasticsearch.index.fieldvisitor.*; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.FieldMappers; @@ -80,8 +82,18 @@ public FetchPhase(HighlightPhase highlightPhase, ScriptFieldsFetchSubPhase scrip @Override public void preProcess(SearchContext context) { } - public void execute(SearchContext context) { + try{ + internalExecute(context); + }catch(RuntimeException e){ + if(ExceptionsHelper.wasCausedBy(e,ActivityTimedOutException.class)){ + context.fetchResult().setTimedOut(true); + }else { + throw e; + } + } + } + private void internalExecute(SearchContext context) { FieldsVisitor fieldsVisitor; List extractFieldNames = null; diff --git a/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index 0db1096ad208c..dca4bc46c51e6 100644 --- a/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -23,12 +23,11 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalSearchHits; +import org.elasticsearch.search.internal.InternalSearchHits.StreamContext; import org.elasticsearch.transport.TransportResponse; import java.io.IOException; -import static org.elasticsearch.search.internal.InternalSearchHits.StreamContext; - /** * */ @@ -39,6 +38,7 @@ public class FetchSearchResult extends TransportResponse implements FetchSearchR private InternalSearchHits hits; // client side counter private transient int counter; + private boolean timedOut; public FetchSearchResult() { @@ -94,13 +94,27 @@ public static FetchSearchResult readFetchSearchResult(StreamInput in) throws IOE public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readLong(); - hits = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + timedOut = in.readBoolean(); + if (!timedOut) { + hits = InternalSearchHits.readSearchHits(in, + InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(id); - hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + out.writeBoolean(timedOut); + if (!timedOut) { + hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + } + } + public boolean isTimedOut() { + return timedOut; + } + + public void setTimedOut(boolean timedOut) { + this.timedOut=timedOut; } } diff --git a/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index fd1c1e513621e..ff36cef1dd27b 100644 --- a/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -21,6 +21,7 @@ import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.search.*; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.MinimumScoreCollector; @@ -28,6 +29,7 @@ import org.elasticsearch.common.lucene.search.FilteredCollector; import org.elasticsearch.common.lucene.search.XCollector; import org.elasticsearch.common.lucene.search.XFilteredQuery; +import org.elasticsearch.common.util.concurrent.ActivityTimedOutException; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.search.dfs.CachedDfSource; import org.elasticsearch.search.internal.SearchContext.Lifetime; @@ -167,9 +169,18 @@ public void search(List leaves, Weight weight, Collector co assert terminateAfterSet : "EarlyTerminationException thrown even though terminateAfter wasn't set"; searchContext.queryResult().terminatedEarly(true); } + catch (RuntimeException e) { + if (ExceptionsHelper.wasCausedBy(e, ActivityTimedOutException.class)) { + searchContext.queryResult().searchTimedOut(true); + searchContext.queryResult().topDocs(new TopDocs(0, new ScoreDoc[0], 0f)); + } else { + throw e; + } + } if (terminateAfterSet && searchContext.queryResult().terminatedEarly() == null) { searchContext.queryResult().terminatedEarly(false); } + } else { super.search(leaves, weight, collector); } diff --git a/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/src/main/java/org/elasticsearch/search/internal/SearchContext.java index ac0ab04ed056e..341d4c87c5727 100644 --- a/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.filter.FilterCache; import org.elasticsearch.index.cache.fixedbitset.FixedBitSetFilterCache; @@ -74,14 +75,35 @@ public abstract class SearchContext implements Releasable { private static ThreadLocal current = new ThreadLocal<>(); public final static int DEFAULT_TERMINATE_AFTER = 0; + /** + * Sets the SearchContext for the current thread. Any required timeout settings + * for the SearchContext must be present at this point as this, figuratively speaking, + * is where any countdown clock starts ticking + * + * @param value The new current search context + */ public static void setCurrent(SearchContext value) { current.set(value); QueryParseContext.setTypes(value.types()); } public static void removeCurrent() { + //The Junit test FieldDataTermsFilterTests calls removeCurrent without first calling setCurrent. + //Rather than change that I have chosen to make this method resilient in such cases. + SearchContext currentContext = current.get(); + if(currentContext!=null && currentContext.timeoutInMillis() > 0) + { + // It would have been cleaner and more symmetrical to have the setCurrent/removeCurrent methods + // in this class call the timeout start/stop functions but there are 2 complications: + // 1) The parsing logic means that the setCurrent() method is called before the timeout value + // has actually been parsed. + // 2) It is conceivable that the timeout setting may be adjusted in-flight eg a future admin tool might revise + // the time allowed for a query to run. + ActivityTimeMonitor.stopActivity(); + } current.remove(); QueryParseContext.removeTypes(); + assert ActivityTimeMonitor.getCurrentThreadMonitor() == null || ActivityTimeMonitor.getCurrentThreadMonitor().getStatus() == ActivityTimeMonitor.ActivityStatus.INACTIVE; } public static SearchContext current() { @@ -374,4 +396,16 @@ public enum Lifetime { */ CONTEXT; } + + public void startTimedActivityForThisThread(long timeoutInMillis) { + ActivityTimeMonitor atm = ActivityTimeMonitor.getCurrentThreadMonitor(); + if (atm != null) { + // We had a prior timeout threshold in place that needs revising + atm.stop(); + } + if (timeoutInMillis() > 0) { + ActivityTimeMonitor.startActivity(timeoutInMillis); + } + } + } diff --git a/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 18d49406d0e97..a90a77f7f9f5f 100644 --- a/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -24,9 +24,11 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.concurrent.ActivityTimedOutException; import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.SearchPhase; import org.elasticsearch.search.aggregations.AggregationPhase; @@ -155,14 +157,31 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep } searchContext.queryResult().topDocs(topDocs); } catch (Throwable e) { - throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e); + if(ExceptionsHelper.wasCausedBy(e,ActivityTimedOutException.class)){ + searchContext.queryResult().searchTimedOut(true); + searchContext.queryResult().topDocs(new TopDocs(0, new ScoreDoc[0], 0f)); + }else + { + throw ExceptionsHelper.convertToRuntime(e); + } } finally { searchContext.searcher().finishStage(ContextIndexSearcher.Stage.MAIN_QUERY); } - if (rescore) { // only if we do a regular search - rescorePhase.execute(searchContext); + //We don't want to do any more than necessary if timed out + if(!searchContext.queryResult().searchTimedOut()){ + try{ + if (rescore) { // only if we do a regular search + rescorePhase.execute(searchContext); + } + suggestPhase.execute(searchContext); + aggregationPhase.execute(searchContext); + } catch (RuntimeException rte) { + if (ExceptionsHelper.wasCausedBy(rte, ActivityTimedOutException.class)) { + searchContext.queryResult().searchTimedOut(true); + } else { + throw rte; + } + } } - suggestPhase.execute(searchContext); - aggregationPhase.execute(searchContext); } } diff --git a/src/test/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitorTest.java b/src/test/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitorTest.java new file mode 100644 index 0000000000000..df378d8020998 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/util/concurrent/ActivityTimeMonitorTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.util.ArrayList; +/** + * Tests that multi-threaded activities with various timeout settings can + * run simultaneously and still trigger timely and appropriate timeout errors. + */ +public class ActivityTimeMonitorTest extends ElasticsearchTestCase{ + + public static final int MAXIMUM_TIMING_DISCREPANCY_MS = 50; + + + @Test + public void testSingleTimeout() throws InterruptedException { + checkTimeoutsFireCorrectly(new TimedActivity(500, 100, 6)); + } + + @Test + public void testSingleNoTimeout() throws InterruptedException { + + checkTimeoutsFireCorrectly(new TimedActivity(500, 10, 5)); + } + + @Test + public void testOneLongNoTimeoutOneShortTimeout() throws InterruptedException { + + TimedActivity goodLong = new TimedActivity(500, 10, 5); + TimedActivity shortBad = new TimedActivity(100, 50, 5); + checkTimeoutsFireCorrectly(goodLong,shortBad); + } + @Test + public void testOneLongNoTimeoutManyShortTimeout() throws InterruptedException { + + ArrayList activities = new ArrayList(); + TimedActivity goodLong = new TimedActivity(500, 10, 5); + int numShorts = 10; + activities.add(goodLong); + for (int i = 0; i < numShorts; i++) { + activities.add(new TimedActivity(100, 50, 5)); + } + checkTimeoutsFireCorrectly((TimedActivity[]) activities.toArray(new TimedActivity[activities.size()])); + } + @Test + public void testOneLongTimeoutManyShortNoTimeout() throws InterruptedException { + ArrayList activities = new ArrayList(); + TimedActivity goodLong = new TimedActivity(500, 5, 120); + int numShorts = 10; + activities.add(goodLong); + for (int i = 0; i < numShorts; i++) { + activities.add(new TimedActivity(100, 5, 5)); + } + checkTimeoutsFireCorrectly((TimedActivity[]) activities.toArray(new TimedActivity[activities.size()])); + } + + + @Test + public void testTimerReset() throws InterruptedException { + ActivityTimeMonitor atm = ActivityTimeMonitor.getOrCreateCurrentThreadMonitor(); + atm.start(500); + Thread.sleep(100); + atm.checkForTimeout(); + atm.stop(); + atm.start(1000); // extend the time allotted to this thread + Thread.sleep(500); + atm.checkForTimeout(); + atm.stop(); + } + + @Test + public void testRandomThreads() throws InterruptedException { + // Tests for a mix of threads - some of which are expected to timeout + // while others not + int numThreads = 100; + int maxTimeToTake = 2000; + int maxNumChecks = 50; + int minTimeBetweenChecks = 5; + int maxTimeBetweenChecks = 50; + + ArrayList activities = new ArrayList(numThreads); + for (int i = 0; i < numThreads; i++) { + int maxTime = (int) (Math.random() * maxTimeToTake); + int numChecks = 1 + (int) (Math.random() * maxNumChecks); + int timeBetweenChecks = minTimeBetweenChecks + (int) (Math.random() * maxTimeBetweenChecks); + activities.add(new TimedActivity(maxTime, timeBetweenChecks, numChecks)); + } + checkTimeoutsFireCorrectly((TimedActivity[]) activities.toArray(new TimedActivity[activities.size()])); + int numTimeouts = 0; + for (TimedActivity timedActivity : activities) { + if (timedActivity.didTimeout) { + numTimeouts++; + } + } + assertTrue("Invalid test parameters - failed to produce a timeout condition", numTimeouts > 0); + assertEquals("All activities should be completed", 0, ActivityTimeMonitor.getCurrentNumActivities()); + } + + public void checkTimeoutsFireCorrectly(TimedActivity... activities ) throws InterruptedException{ + Thread threads[] = new Thread[activities.length]; + for (int i = 0; i < activities.length; i++) { + threads[i] = new Thread(activities[i]); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + for (int i = 0; i < activities.length; i++) { + activities[i].checkAssertions(); + } + } + + + static class TimedActivity implements Runnable{ + int maxTimePermitted; + int checkForTimeoutEvery; + int numberOfChecks; + boolean didTimeout = false; + private long timeTaken; + + public TimedActivity(int maxTimePermitted, int checkForTimeoutEvery, int numberOfChecks) { + super(); + this.maxTimePermitted = maxTimePermitted; + this.checkForTimeoutEvery = checkForTimeoutEvery; + this.numberOfChecks = numberOfChecks; + } + + @Override + public void run() { + long start = System.currentTimeMillis(); + ActivityTimeMonitor.getOrCreateCurrentThreadMonitor().start(maxTimePermitted); + try { + try { + for (int i = 0; i < numberOfChecks; i++) { + Thread.sleep(checkForTimeoutEvery); + ActivityTimeMonitor.getOrCreateCurrentThreadMonitor().checkForTimeout(); + } + } catch (ActivityTimedOutException e) { + didTimeout = true; + } catch (InterruptedException e) { + throw new RuntimeException(); + } + } finally { + ActivityTimeMonitor.getOrCreateCurrentThreadMonitor().stop(); + timeTaken = System.currentTimeMillis() - start; + } + + } + + public void checkAssertions() { + boolean timeoutExpected = timeTaken >= maxTimePermitted; + long overrun = timeTaken - maxTimePermitted; + if ((timeoutExpected) && (!didTimeout)) { + if (overrun <= MAXIMUM_TIMING_DISCREPANCY_MS) { + // allow for minor timing discrepancies (<10ms) in heavily + // multi-threaded tests + // where a thread that overruns marginally escapes the + // scheduled overrun + // detection logic. + return; + } + } + assertEquals("Took " + timeTaken + " but timeout set to " + maxTimePermitted, timeoutExpected, didTimeout); + boolean wasTooSlowReportingOverrun = overrun > (checkForTimeoutEvery + MAXIMUM_TIMING_DISCREPANCY_MS); + assertFalse("Too slow reporting timeout - overrun of " + overrun + " with check every " + checkForTimeoutEvery, + wasTooSlowReportingOverrun); + + } + + } + +} diff --git a/src/test/java/org/elasticsearch/search/query/SimpleQueryTests.java b/src/test/java/org/elasticsearch/search/query/SimpleQueryTests.java index 30038ce18cfcb..9e98f3508e79f 100644 --- a/src/test/java/org/elasticsearch/search/query/SimpleQueryTests.java +++ b/src/test/java/org/elasticsearch/search/query/SimpleQueryTests.java @@ -473,7 +473,7 @@ public void testOmitTermFreqsAndPositions() throws Exception { client().prepareSearch().setQuery(matchQuery("field1", "quick brown").type(MatchQueryBuilder.Type.PHRASE).slop(0)).get(); fail("SearchPhaseExecutionException should have been thrown"); } catch (SearchPhaseExecutionException e) { - assertTrue(e.getMessage().endsWith("IllegalStateException[field \"field1\" was indexed without position data; cannot run PhraseQuery (term=quick)]; }")); + assertTrue(e.getMessage().contains("IllegalStateException[field \"field1\" was indexed without position data; cannot run PhraseQuery (term=quick)]")); } cluster().wipeIndices("test"); } catch (MapperParsingException ex) { diff --git a/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java b/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java index c9abce165fcd0..f4e296256a1b0 100644 --- a/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java +++ b/src/test/java/org/elasticsearch/search/timeout/SearchTimeoutTests.java @@ -19,16 +19,24 @@ package org.elasticsearch.search.timeout; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.groovy.GroovyScriptEngineService; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.hamcrest.Matchers; import org.junit.Test; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.index.query.FilterBuilders.scriptFilter; -import static org.elasticsearch.index.query.QueryBuilders.filteredQuery; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.query.QueryBuilders.*; import static org.hamcrest.Matchers.equalTo; /** @@ -36,6 +44,22 @@ @ElasticsearchIntegrationTest.ClusterScope(scope=ElasticsearchIntegrationTest.Scope.SUITE) public class SearchTimeoutTests extends ElasticsearchIntegrationTest { + + @Override + public Settings indexSettings() { + ImmutableSettings.Builder builder = ImmutableSettings.builder(); + //Timings in this test rely on having 2 shards + builder.put(SETTING_NUMBER_OF_SHARDS, 2).build(); + builder.put(IndexMetaData.SETTING_THOROUGH_TIMEOUT_CHECKS, true).build(); + if (randomizeNumberOfShardsAndReplicas()) { + int numberOfReplicas = numberOfReplicas(); + if (numberOfReplicas >= 0) { + builder.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build(); + } + } + return builder.build(); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put(GroovyScriptEngineService.GROOVY_SCRIPT_SANDBOX_ENABLED, false).build(); @@ -51,4 +75,91 @@ public void simpleTimeoutTest() throws Exception { .execute().actionGet(); assertThat(searchResponse.isTimedOut(), equalTo(true)); } + + @Test + public void timedFileAccessTests() throws Exception { + createIndex("test"); + + //Create an index with documents that have test fields that hold delay times required to simulate + // processing the documents' content during a search + int []delays={0,0,0,0,0,10,10,10,10,10,100,100,100,100,100,1000,1000}; + for (int i = 0; i < delays.length; i++) { + client().prepareIndex("test", "type", Integer.toString(i)) + .setSource("delay", delays[i]).execute().actionGet(); + } + client().admin().indices().prepareRefresh().execute().actionGet(); + + //Run a series of searches on docs with different processing delays between doc retrieves + // and expected search timeouts + assertThat(runTimeRestrictedSearch(0,10,1000).isTimedOut(),equalTo(false)); + assertThat(runTimeRestrictedSearch(10,10,1000).isTimedOut(),equalTo(false)); + assertThat(runTimeRestrictedSearch(100,100,200).isTimedOut(),equalTo(true)); + + //Test partial results + //One shard should take <250 ms but the other shard should take >250ms + SearchResponse longResponse = runTimeRestrictedSearch(100,100,250); + assertThat(longResponse.isTimedOut(),equalTo(true)); + SearchHits hits = longResponse.getHits(); + assertTrue("Must have some hits", hits.getTotalHits()>0); + for (SearchHit searchHit : hits) { + Integer delayField = (Integer) searchHit.getSource().get("delay"); + + assertNotNull(delayField); + int delay=delayField.intValue(); + //Must have only retrieved docs before the search timeout threshold + assertThat(delay, Matchers.lessThan(500)); + } + } + @Test + public void timedAggScriptTests() throws Exception { + createIndex("test"); + + // Create an index with documents that have test fields that hold delay times required to simulate + // processing the documents' content during a search + int []delays={50,50,1000,50,50,2000,10,10}; + for (int i = 0; i < delays.length; i++) { + client().prepareIndex("test", "type", Integer.toString(i)) + .setSource("delay", delays[i]).execute().actionGet(); + } + client().admin().indices().prepareRefresh().execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet(); + + //One shard should run 50+50+10ms delay and another 50+1000+50+10ms + SearchResponse oneShardTimedoutResponse = runTimeRestrictedAgg(500, 0, 1000); + assertThat(oneShardTimedoutResponse.isTimedOut(),equalTo(true)); + Aggregations aggs = oneShardTimedoutResponse.getAggregations(); + assertNotNull(aggs); + //One shard should run 50+50+1000+10 delays and another 50+50 + 2000+10 - both > 500ms timeout + SearchResponse bothShardsTimedoutResponse = runTimeRestrictedAgg(500, 0, 2000); + assertThat(bothShardsTimedoutResponse.isTimedOut(),equalTo(true)); + aggs = bothShardsTimedoutResponse.getAggregations(); + assertNull(aggs); + } + + private SearchResponse runTimeRestrictedAgg(int overallTimeout, int fromDelay, int toDelay) { + SearchResponse searchResponse = client().prepareSearch("test") + .setTimeout(overallTimeout+"ms") + .setQuery(rangeQuery("delay").from(fromDelay).to(toDelay)) + //Use _source field in script to force slow evaluations that involve disk access + .addAggregation(new TermsBuilder("delayAgg").script("Integer delay=_source.delay;" + + "Thread.sleep(delay);" + //If needed, explicit checking of timeouts rather than relying on implicit checks on file accesses.. +// + "org.elasticsearch.common.util.concurrent.ActivityTimeMonitor.getCurrentThreadMonitor().checkForTimeout();" + + "return delay")) + .execute().actionGet(); + return searchResponse; + } + + private SearchResponse runTimeRestrictedSearch(int fromDelay, int toDelay, int overallTimeout){ + SearchResponse searchResponse = client().prepareSearch("test") + .setTimeout(overallTimeout+"ms") + .setQuery(filteredQuery(rangeQuery("delay").from(fromDelay).to(toDelay), + //Use _source field in script to force slow evaluations that involve disk access + scriptFilter("Integer delay=_source.delay;" + + "Thread.sleep(delay);" + + "return true;"))) + .execute().actionGet(); + return searchResponse; + } + } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchThreadFilter.java b/src/test/java/org/elasticsearch/test/ElasticsearchThreadFilter.java index 7078f84519676..9114627d173ab 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchThreadFilter.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchThreadFilter.java @@ -21,7 +21,7 @@ import com.carrotsearch.randomizedtesting.ThreadFilter; import org.elasticsearch.common.network.MulticastChannel; -import org.elasticsearch.test.hamcrest.RegexMatcher; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.tribe.TribeTests; import java.util.regex.Pattern; @@ -49,7 +49,8 @@ public boolean reject(Thread t) { if (threadName.contains("[" + MulticastChannel.SHARED_CHANNEL_NAME + "]") || threadName.contains("[" + ElasticsearchSingleNodeTest.nodeName() + "]") - || threadName.contains("Keep-Alive-Timer")) { + || threadName.contains("Keep-Alive-Timer") + || threadName.contains(ActivityTimeMonitor.TIMEOUT_MONITOR_THREADNAME)) { return true; } return nodePrefix.matcher(t.getName()).find(); diff --git a/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java b/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java index 28cd227e0abff..b229788bf34ea 100644 --- a/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java +++ b/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java @@ -26,16 +26,16 @@ import org.elasticsearch.common.network.MulticastChannel; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ActivityTimeMonitor; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.*; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.ElasticsearchSingleNodeTest; -import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import org.elasticsearch.test.hamcrest.RegexMatcher; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.tribe.TribeTests; @@ -52,7 +52,6 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.*; @@ -114,7 +113,8 @@ public void verifyThreadNames() throws Exception { // or the ones that are occasionally come up from ElasticsearchSingleNodeTest if (threadName.contains("[" + MulticastChannel.SHARED_CHANNEL_NAME + "]") || threadName.contains("[" + ElasticsearchSingleNodeTest.nodeName() + "]") - || threadName.contains("Keep-Alive-Timer")) { + || threadName.contains("Keep-Alive-Timer") + || threadName.contains(ActivityTimeMonitor.TIMEOUT_MONITOR_THREADNAME)) { continue; } String nodePrefix = "(" + Pattern.quote(InternalTestCluster.TRANSPORT_CLIENT_PREFIX) + ")?(" +