diff --git a/docs/reference/migration/migrate_8_0.asciidoc b/docs/reference/migration/migrate_8_0.asciidoc index 9e4532b87ccbe..4478b678a11f3 100644 --- a/docs/reference/migration/migrate_8_0.asciidoc +++ b/docs/reference/migration/migrate_8_0.asciidoc @@ -81,5 +81,6 @@ include::migrate_8_0/http.asciidoc[] include::migrate_8_0/reindex.asciidoc[] include::migrate_8_0/search.asciidoc[] include::migrate_8_0/settings.asciidoc[] +include::migrate_8_0/threadpool.asciidoc[] include::migrate_8_0/indices.asciidoc[] include::migrate_8_0/api.asciidoc[] diff --git a/docs/reference/migration/migrate_8_0/threadpool.asciidoc b/docs/reference/migration/migrate_8_0/threadpool.asciidoc new file mode 100644 index 0000000000000..1c997f45bec6e --- /dev/null +++ b/docs/reference/migration/migrate_8_0/threadpool.asciidoc @@ -0,0 +1,10 @@ +[float] +[[breaking_80_threadpool_changes]] +=== Thread pool changes + +[float] +==== Removal of the `fixed_auto_queue_size` thread pool type + +The `fixed_auto_queue_size` thread pool type, previously marked as an +experimental feature, was deprecated in 7.x and has been removed in 8.0. +The `search` and `search_throttled` thread pools have the `fixed` type now. diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 5a5c5c5e0b748..7d5af937544c4 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -15,14 +15,13 @@ There are several thread pools, but the important ones include: `search`:: For count/search/suggest operations. Thread pool type is - `fixed_auto_queue_size` with a size of - `int((# of available_processors * 3) / 2) + 1`, and initial queue_size of + `fixed` with a size of + `int((# of available_processors * 3) / 2) + 1`, and queue_size of `1000`. [[search-throttled]]`search_throttled`:: For count/search/suggest/get operations on `search_throttled indices`. - Thread pool type is `fixed_auto_queue_size` with a size of `1`, and initial - queue_size of `100`. + Thread pool type is `fixed` with a size of `1`, and queue_size of `100`. `get`:: For get operations. Thread pool type is `fixed` @@ -119,52 +118,6 @@ thread_pool: queue_size: 1000 -------------------------------------------------- -[float] -[[fixed-auto-queue-size]] -==== `fixed_auto_queue_size` - -experimental[] - -The `fixed_auto_queue_size` thread pool holds a fixed size of threads to handle -the requests with a bounded queue for pending requests that have no threads to -service them. It's similar to the `fixed` threadpool, however, the `queue_size` -automatically adjusts according to calculations based on -https://en.wikipedia.org/wiki/Little%27s_law[Little's Law]. These calculations -will potentially adjust the `queue_size` up or down by 50 every time -`auto_queue_frame_size` operations have been completed. - -The `size` parameter controls the number of threads. - -The `queue_size` allows to control the initial size of the queue of pending -requests that have no threads to execute them. - -The `min_queue_size` setting controls the minimum amount the `queue_size` can be -adjusted to. - -The `max_queue_size` setting controls the maximum amount the `queue_size` can be -adjusted to. - -The `auto_queue_frame_size` setting controls the number of operations during -which measurement is taken before the queue is adjusted. It should be large -enough that a single operation cannot unduly bias the calculation. - -The `target_response_time` is a time value setting that indicates the targeted -average response time for tasks in the thread pool queue. If tasks are routinely -above this time, the thread pool queue will be adjusted down so that tasks are -rejected. - -[source,yaml] --------------------------------------------------- -thread_pool: - search: - size: 30 - queue_size: 500 - min_queue_size: 10 - max_queue_size: 1000 - auto_queue_frame_size: 2000 - target_response_time: 1s --------------------------------------------------- - [float] [[scaling]] ==== `scaling` diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java index f928439657e3d..ed85177cad46f 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java @@ -71,7 +71,7 @@ public void testExecutionErrorOnDirectExecutorService() throws InterruptedExcept public void testExecutionErrorOnFixedESThreadPoolExecutor() throws InterruptedException { final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1, - EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext()); + EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), randomBoolean()); try { checkExecutionError(getExecuteRunner(fixedExecutor)); checkExecutionError(getSubmitRunner(fixedExecutor)); @@ -91,17 +91,6 @@ public void testExecutionErrorOnScalingESThreadPoolExecutor() throws Interrupted } } - public void testExecutionErrorOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException { - final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1, - 1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext()); - try { - checkExecutionError(getExecuteRunner(autoQueueFixedExecutor)); - checkExecutionError(getSubmitRunner(autoQueueFixedExecutor)); - } finally { - ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS); - } - } - public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException { final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test", EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler()); @@ -180,7 +169,7 @@ public void testExecutionExceptionOnDirectExecutorService() throws InterruptedEx public void testExecutionExceptionOnFixedESThreadPoolExecutor() throws InterruptedException { final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1, - EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext()); + EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), randomBoolean()); try { checkExecutionException(getExecuteRunner(fixedExecutor), true); checkExecutionException(getSubmitRunner(fixedExecutor), false); @@ -200,18 +189,6 @@ public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws Interru } } - public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException { - final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1, - 1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext()); - try { - // fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable - checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), true); - checkExecutionException(getSubmitRunner(autoQueueFixedExecutor), false); - } finally { - ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS); - } - } - public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException { final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test", EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler()); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EWMATrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EWMATrackingEsThreadPoolExecutor.java new file mode 100644 index 0000000000000..52b156826fe83 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EWMATrackingEsThreadPoolExecutor.java @@ -0,0 +1,104 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * An extension to thread pool executor, which tracks the exponentially weighted moving average of the task execution time. + */ +public final class EWMATrackingEsThreadPoolExecutor extends EsThreadPoolExecutor { + + // This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable + public static double EWMA_ALPHA = 0.3; + + private final Function runnableWrapper; + private final ExponentiallyWeightedMovingAverage executionEWMA; + + EWMATrackingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, Function runnableWrapper, + ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) { + super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, + workQueue, threadFactory, handler, contextHolder); + this.runnableWrapper = runnableWrapper; + this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + } + + @Override + protected Runnable wrapRunnable(Runnable command) { + return super.wrapRunnable(this.runnableWrapper.apply(command)); + } + + @Override + protected Runnable unwrap(Runnable runnable) { + final Runnable unwrapped = super.unwrap(runnable); + if (unwrapped instanceof WrappedRunnable) { + return ((WrappedRunnable) unwrapped).unwrap(); + } else { + return unwrapped; + } + } + + /** + * Returns the exponentially weighted moving average of the task execution time + */ + public double getTaskExecutionEWMA() { + return executionEWMA.getAverage(); + } + + /** + * Returns the current queue size (operations that are queued) + */ + public int getCurrentQueueSize() { + return getQueue().size(); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + // A task has been completed, it has left the building. We should now be able to get the + // total time as a combination of the time in the queue and time spent running the task. We + // only want runnables that did not throw errors though, because they could be fast-failures + // that throw off our timings, so only check when t is null. + assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue"; + final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r); + final boolean failedOrRejected = timedRunnable.getFailedOrRejected(); + final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos(); + assert taskExecutionNanos >= 0 || (failedOrRejected && taskExecutionNanos == -1) : + "expected task to always take longer than 0 nanoseconds or have '-1' failure code, got: " + taskExecutionNanos + + ", failedOrRejected: " + failedOrRejected; + if (taskExecutionNanos != -1) { + // taskExecutionNanos may be -1 if the task threw an exception + executionEWMA.addValue(taskExecutionNanos); + } + } + + @Override + protected void appendThreadPoolExecutorDetails(StringBuilder sb) { + sb.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long) executionEWMA.getAverage())).append(", "); + } + +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index dc2df158d3cd8..f12d432ae5cb2 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.Node; import java.util.Arrays; @@ -83,38 +82,20 @@ public static EsThreadPoolExecutor newScaling(String name, int min, int max, lon } public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, - ThreadFactory threadFactory, ThreadContext contextHolder) { + ThreadFactory threadFactory, ThreadContext contextHolder, boolean trackEWMA) { BlockingQueue queue; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), queueCapacity); } - return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, - queue, threadFactory, new EsAbortPolicy(), contextHolder); - } - - /** - * Return a new executor that will automatically adjust the queue size based on queue throughput. - * - * @param size number of fixed threads to use for executing tasks - * @param initialQueueCapacity initial size of the executor queue - * @param minQueueSize minimum queue size that the queue can be adjusted to - * @param maxQueueSize maximum queue size that the queue can be adjusted to - * @param frameSize number of tasks during which stats are collected before adjusting queue size - */ - public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize, - int maxQueueSize, int frameSize, TimeValue targetedResponseTime, - ThreadFactory threadFactory, ThreadContext contextHolder) { - if (initialQueueCapacity <= 0) { - throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " + - initialQueueCapacity); + if (trackEWMA) { + return new EWMATrackingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, + queue, TimedRunnable::new, threadFactory, new EsAbortPolicy(), contextHolder); + } else { + return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, + queue, threadFactory, new EsAbortPolicy(), contextHolder); } - ResizableBlockingQueue queue = - new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), initialQueueCapacity); - return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, - queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory, - new EsAbortPolicy(), contextHolder); } /** diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java deleted file mode 100644 index ac917e72b2d2f..0000000000000 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; -import org.elasticsearch.common.unit.TimeValue; - -import java.util.Locale; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; - -/** - * An extension to thread pool executor, which automatically adjusts the queue size of the - * {@code ResizableBlockingQueue} according to Little's Law. - */ -public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor { - - // This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable - public static double EWMA_ALPHA = 0.3; - - private static final Logger logger = LogManager.getLogger(QueueResizingEsThreadPoolExecutor.class); - // The amount the queue size is adjusted by for each calcuation - private static final int QUEUE_ADJUSTMENT_AMOUNT = 50; - - private final Function runnableWrapper; - private final ResizableBlockingQueue workQueue; - private final int tasksPerFrame; - private final int minQueueSize; - private final int maxQueueSize; - private final long targetedResponseTimeNanos; - private final ExponentiallyWeightedMovingAverage executionEWMA; - - private final AtomicLong totalTaskNanos = new AtomicLong(0); - private final AtomicInteger taskCount = new AtomicInteger(0); - - private long startNs; - - QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - ResizableBlockingQueue workQueue, int minQueueSize, int maxQueueSize, - Function runnableWrapper, final int tasksPerFrame, - TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler, - ThreadContext contextHolder) { - super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, - workQueue, threadFactory, handler, contextHolder); - this.runnableWrapper = runnableWrapper; - this.workQueue = workQueue; - this.tasksPerFrame = tasksPerFrame; - this.startNs = System.nanoTime(); - this.minQueueSize = minQueueSize; - this.maxQueueSize = maxQueueSize; - this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); - this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); - logger.debug( - "thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), QUEUE_ADJUSTMENT_AMOUNT); - } - - @Override - protected Runnable wrapRunnable(Runnable command) { - return super.wrapRunnable(this.runnableWrapper.apply(command)); - } - - @Override - protected Runnable unwrap(Runnable runnable) { - final Runnable unwrapped = super.unwrap(runnable); - if (unwrapped instanceof WrappedRunnable) { - return ((WrappedRunnable) unwrapped).unwrap(); - } else { - return unwrapped; - } - } - - /** - * Calculate task rate (λ), for a fixed number of tasks and time it took those tasks to be measured - * - * @param totalNumberOfTasks total number of tasks that were measured - * @param totalFrameTaskNanos nanoseconds during which the tasks were received - * @return the rate of tasks in the system - */ - static double calculateLambda(final int totalNumberOfTasks, final long totalFrameTaskNanos) { - assert totalFrameTaskNanos > 0 : "cannot calculate for instantaneous tasks, got: " + totalFrameTaskNanos; - assert totalNumberOfTasks > 0 : "cannot calculate for no tasks, got: " + totalNumberOfTasks; - // There is no set execution time, instead we adjust the time window based on the - // number of completed tasks, so there is no background thread required to update the - // queue size at a regular interval. This means we need to calculate our λ by the - // total runtime, rather than a fixed interval. - - // λ = total tasks divided by measurement time - return (double) totalNumberOfTasks / totalFrameTaskNanos; - } - - /** - * Calculate Little's Law (L), which is the "optimal" queue size for a particular task rate (lambda) and targeted response time. - * - * @param lambda the arrival rate of tasks in nanoseconds - * @param targetedResponseTimeNanos nanoseconds for the average targeted response rate of requests - * @return the optimal queue size for the give task rate and targeted response time - */ - static int calculateL(final double lambda, final long targetedResponseTimeNanos) { - assert targetedResponseTimeNanos > 0 : "cannot calculate for instantaneous requests"; - // L = λ * W - return Math.toIntExact((long)(lambda * targetedResponseTimeNanos)); - } - - /** - * Returns the exponentially weighted moving average of the task execution time - */ - public double getTaskExecutionEWMA() { - return executionEWMA.getAverage(); - } - - /** - * Returns the current queue size (operations that are queued) - */ - public int getCurrentQueueSize() { - return workQueue.size(); - } - - @Override - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - // A task has been completed, it has left the building. We should now be able to get the - // total time as a combination of the time in the queue and time spent running the task. We - // only want runnables that did not throw errors though, because they could be fast-failures - // that throw off our timings, so only check when t is null. - assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue"; - final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r); - final long taskNanos = timedRunnable.getTotalNanos(); - final boolean failedOrRejected = timedRunnable.getFailedOrRejected(); - final long totalNanos = totalTaskNanos.addAndGet(taskNanos); - - final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos(); - assert taskExecutionNanos >= 0 || (failedOrRejected && taskExecutionNanos == -1) : - "expected task to always take longer than 0 nanoseconds or have '-1' failure code, got: " + taskExecutionNanos + - ", failedOrRejected: " + failedOrRejected; - if (taskExecutionNanos != -1) { - // taskExecutionNanos may be -1 if the task threw an exception - executionEWMA.addValue(taskExecutionNanos); - } - - if (taskCount.incrementAndGet() == this.tasksPerFrame) { - final long endTimeNs = System.nanoTime(); - final long totalRuntime = endTimeNs - this.startNs; - // Reset the start time for all tasks. At first glance this appears to need to be - // volatile, since we are reading from a different thread when it is set, but it - // is protected by the taskCount memory barrier. - // See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html - startNs = endTimeNs; - - // Calculate the new desired queue size - try { - final double lambda = calculateLambda(tasksPerFrame, Math.max(totalNanos, 1L)); - final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos); - final int oldCapacity = workQueue.capacity(); - - if (logger.isDebugEnabled()) { - final long avgTaskTime = totalNanos / tasksPerFrame; - logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " + - "[{} tasks/s], optimal queue is [{}], current capacity [{}]", - getName(), - tasksPerFrame, - TimeValue.timeValueNanos(totalRuntime), - TimeValue.timeValueNanos(avgTaskTime), - TimeValue.timeValueNanos((long)executionEWMA.getAverage()), - String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()), - desiredQueueSize, - oldCapacity); - } - - // Adjust the queue size towards the desired capacity using an adjust of - // QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max - // values the queue size can have. - final int newCapacity = - workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize); - if (oldCapacity != newCapacity && logger.isDebugEnabled()) { - logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(), - newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT, - oldCapacity, newCapacity); - } - } catch (ArithmeticException e) { - // There was an integer overflow, so just log about it, rather than adjust the queue size - logger.warn(() -> new ParameterizedMessage( - "failed to calculate optimal queue size for [{}] thread pool, " + - "total frame time [{}ns], tasks [{}], task execution time [{}ns]", - getName(), totalRuntime, tasksPerFrame, totalNanos), - e); - } finally { - // Finally, decrement the task count and time back to their starting values. We - // do this at the end so there is no concurrent adjustments happening. We also - // decrement them instead of resetting them back to zero, as resetting them back - // to zero causes operations that came in during the adjustment to be uncounted - int tasks = taskCount.addAndGet(-this.tasksPerFrame); - assert tasks >= 0 : "tasks should never be negative, got: " + tasks; - - if (tasks >= this.tasksPerFrame) { - // Start over, because we can potentially reach a "never adjusting" state, - // - // consider the following: - // - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10) - // - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25 - // - Adjustment happens and we decrement the tasks by 10, taskCount is now 15 - // - Since taskCount will now be incremented forever, it will never be 10 again, - // so there will be no further adjustments - logger.debug( - "[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName()); - totalTaskNanos.getAndSet(1); - taskCount.getAndSet(0); - startNs = System.nanoTime(); - } else { - // Do a regular adjustment - totalTaskNanos.addAndGet(-totalNanos); - } - } - } - } - - @Override - protected void appendThreadPoolExecutorDetails(StringBuilder sb) { - sb.append("min queue capacity = ").append(minQueueSize).append(", "); - sb.append("max queue capacity = ").append(maxQueueSize).append(", "); - sb.append("frame size = ").append(tasksPerFrame).append(", "); - sb.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", "); - sb.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long) executionEWMA.getAverage())).append(", "); - sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", "); - } - -} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java deleted file mode 100644 index 57240b6e1cfd5..0000000000000 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent; - -import java.util.concurrent.BlockingQueue; - -/** - * Extends the {@code SizeBlockingQueue} to add the {@code adjustCapacity} method, which will adjust - * the capacity by a certain amount towards a maximum or minimum. - */ -final class ResizableBlockingQueue extends SizeBlockingQueue { - - private volatile int capacity; - - ResizableBlockingQueue(BlockingQueue queue, int initialCapacity) { - super(queue, initialCapacity); - this.capacity = initialCapacity; - } - - @Override - public int capacity() { - return this.capacity; - } - - @Override - public int remainingCapacity() { - return Math.max(0, this.capacity()); - } - - /** Resize the limit for the queue, returning the new size limit */ - public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) { - assert adjustmentAmount > 0 : "adjustment amount should be a positive value"; - assert optimalCapacity >= 0 : "desired capacity cannot be negative"; - assert minCapacity >= 0 : "cannot have min capacity smaller than 0"; - assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity"; - - if (optimalCapacity == capacity) { - // Yahtzee! - return this.capacity; - } - - if (optimalCapacity > capacity + adjustmentAmount) { - // adjust up - final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount); - this.capacity = newCapacity; - return newCapacity; - } else if (optimalCapacity < capacity - adjustmentAmount) { - // adjust down - final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount); - this.capacity = newCapacity; - return newCapacity; - } else { - return this.capacity; - } - } -} diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 6fdc0b43aaa2d..89e0ff5c9fb66 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -277,7 +277,7 @@ static class AsyncLucenePersistedState extends InMemoryPersistedState { nodeName + "/" + THREAD_NAME, 1, 1, daemonThreadFactory(nodeName, THREAD_NAME), - threadPool.getThreadContext()); + threadPool.getThreadContext(), false); this.persistedState = persistedState; } diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 2374e59b0bde5..64e274e734d0c 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -51,7 +51,8 @@ import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; -import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.EWMATrackingEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.index.IndexSortConfig; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.DateFieldMapper.DateFieldType; @@ -301,8 +302,11 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe } ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); - if (executor instanceof QueueResizingEsThreadPoolExecutor) { - QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor; + assert executor instanceof EWMATrackingEsThreadPoolExecutor || + (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */) : + "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass(); + if (executor instanceof EWMATrackingEsThreadPoolExecutor) { + EWMATrackingEsThreadPoolExecutor rExecutor = (EWMATrackingEsThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); } diff --git a/server/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java deleted file mode 100644 index 64f2b2a1fd8bd..0000000000000 --- a/server/src/main/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilder.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool; - -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.SizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.node.Node; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; - -/** - * A builder for executors that automatically adjust the queue length as needed, depending on - * Little's Law. See https://en.wikipedia.org/wiki/Little's_law for more information. - */ -public final class AutoQueueAdjustingExecutorBuilder extends ExecutorBuilder { - - private final Setting sizeSetting; - private final Setting queueSizeSetting; - private final Setting minQueueSizeSetting; - private final Setting maxQueueSizeSetting; - private final Setting targetedResponseTimeSetting; - private final Setting frameSizeSetting; - - AutoQueueAdjustingExecutorBuilder(final Settings settings, final String name, final int size, - final int initialQueueSize, final int minQueueSize, - final int maxQueueSize, final int frameSize) { - super(name); - final String prefix = "thread_pool." + name; - final String sizeKey = settingsKey(prefix, "size"); - this.sizeSetting = - new Setting<>( - sizeKey, - s -> Integer.toString(size), - s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), - Setting.Property.NodeScope); - final String queueSizeKey = settingsKey(prefix, "queue_size"); - final String minSizeKey = settingsKey(prefix, "min_queue_size"); - final String maxSizeKey = settingsKey(prefix, "max_queue_size"); - final String frameSizeKey = settingsKey(prefix, "auto_queue_frame_size"); - final String targetedResponseTimeKey = settingsKey(prefix, "target_response_time"); - this.targetedResponseTimeSetting = Setting.timeSetting(targetedResponseTimeKey, TimeValue.timeValueSeconds(1), - TimeValue.timeValueMillis(10), Setting.Property.NodeScope); - this.queueSizeSetting = Setting.intSetting(queueSizeKey, initialQueueSize, Setting.Property.NodeScope); - // These temp settings are used to validate the min and max settings below - Setting tempMaxQueueSizeSetting = Setting.intSetting(maxSizeKey, maxQueueSize, Setting.Property.NodeScope); - Setting tempMinQueueSizeSetting = Setting.intSetting(minSizeKey, minQueueSize, Setting.Property.NodeScope); - - this.minQueueSizeSetting = new Setting<>( - minSizeKey, - Integer.toString(minQueueSize), - s -> Setting.parseInt(s, 0, minSizeKey), - new Setting.Validator<>() { - - @Override - public void validate(final Integer value) { - - } - - @Override - public void validate(final Integer value, final Map, Object> settings) { - if (value > (int) settings.get(tempMaxQueueSizeSetting)) { - throw new IllegalArgumentException("Failed to parse value [" + value + "] for setting [" + minSizeKey - + "] must be <= " + settings.get(tempMaxQueueSizeSetting)); - } - } - - @Override - public Iterator> settings() { - final List> settings = List.of(tempMaxQueueSizeSetting); - return settings.iterator(); - } - - }, - Setting.Property.NodeScope); - this.maxQueueSizeSetting = new Setting<>( - maxSizeKey, - Integer.toString(maxQueueSize), - s -> Setting.parseInt(s, 0, maxSizeKey), - new Setting.Validator() { - - @Override - public void validate(Integer value) { - - } - - @Override - public void validate(final Integer value, final Map, Object> settings) { - if (value < (int) settings.get(tempMinQueueSizeSetting)) { - throw new IllegalArgumentException("Failed to parse value [" + value + "] for setting [" + minSizeKey - + "] must be >= " + settings.get(tempMinQueueSizeSetting)); - } - } - - @Override - public Iterator> settings() { - final List> settings = List.of(tempMinQueueSizeSetting); - return settings.iterator(); - } - - }, - Setting.Property.NodeScope); - this.frameSizeSetting = Setting.intSetting(frameSizeKey, frameSize, 100, Setting.Property.NodeScope); - } - - @Override - public List> getRegisteredSettings() { - return Arrays.asList(sizeSetting, queueSizeSetting, minQueueSizeSetting, - maxQueueSizeSetting, frameSizeSetting, targetedResponseTimeSetting); - } - - @Override - AutoExecutorSettings getSettings(Settings settings) { - final String nodeName = Node.NODE_NAME_SETTING.get(settings); - final int size = sizeSetting.get(settings); - final int initialQueueSize = queueSizeSetting.get(settings); - final int minQueueSize = minQueueSizeSetting.get(settings); - final int maxQueueSize = maxQueueSizeSetting.get(settings); - final int frameSize = frameSizeSetting.get(settings); - final TimeValue targetedResponseTime = targetedResponseTimeSetting.get(settings); - return new AutoExecutorSettings(nodeName, size, initialQueueSize, minQueueSize, maxQueueSize, frameSize, targetedResponseTime); - } - - @Override - ThreadPool.ExecutorHolder build(final AutoExecutorSettings settings, - final ThreadContext threadContext) { - int size = settings.size; - int initialQueueSize = settings.initialQueueSize; - int minQueueSize = settings.minQueueSize; - int maxQueueSize = settings.maxQueueSize; - int frameSize = settings.frameSize; - TimeValue targetedResponseTime = settings.targetedResponseTime; - final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name())); - final ExecutorService executor = - EsExecutors.newAutoQueueFixed( - settings.nodeName + "/" + name(), - size, - initialQueueSize, - minQueueSize, - maxQueueSize, - frameSize, - targetedResponseTime, - threadFactory, - threadContext); - // TODO: in a subsequent change we hope to extend ThreadPool.Info to be more specific for the thread pool type - final ThreadPool.Info info = - new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE, - size, size, null, new SizeValue(initialQueueSize)); - return new ThreadPool.ExecutorHolder(executor, info); - } - - @Override - String formatInfo(ThreadPool.Info info) { - return String.format( - Locale.ROOT, - "name [%s], size [%d], queue size [%s]", - info.getName(), - info.getMax(), - info.getQueueSize() == null ? "unbounded" : info.getQueueSize()); - } - - static final class AutoExecutorSettings extends ExecutorBuilder.ExecutorSettings { - - final int size; - final int initialQueueSize; - final int minQueueSize; - final int maxQueueSize; - final int frameSize; - final TimeValue targetedResponseTime; - - AutoExecutorSettings(final String nodeName, final int size, final int initialQueueSize, - final int minQueueSize, final int maxQueueSize, final int frameSize, - final TimeValue targetedResponseTime) { - super(nodeName); - this.size = size; - this.initialQueueSize = initialQueueSize; - this.minQueueSize = minQueueSize; - this.maxQueueSize = maxQueueSize; - this.frameSize = frameSize; - this.targetedResponseTime = targetedResponseTime; - } - - } - -} diff --git a/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java index 43da1044c6bd0..d8b405da37517 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java @@ -39,6 +39,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder sizeSetting; private final Setting queueSizeSetting; + private final boolean trackEWMA; /** * Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name. @@ -47,9 +48,10 @@ public final class FixedExecutorBuilder extends ExecutorBuilder executors; @@ -164,24 +164,22 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); - builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200)); - builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); - builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); - builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, - Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); - builders.put(Names.SEARCH_THROTTLED, new AutoQueueAdjustingExecutorBuilder(settings, - Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200)); + builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200, false)); + builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000, false)); + builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, false)); + builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, true)); + builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, true)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side - builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1)); + builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1, false)); builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); - builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); + builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false)); builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); for (final ExecutorBuilder builder : customBuilders) { diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EWMATrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EWMATrackingEsThreadPoolExecutorTests.java new file mode 100644 index 0000000000000..d046d798136e1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EWMATrackingEsThreadPoolExecutorTests.java @@ -0,0 +1,136 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for the automatic queue resizing of the {@code QueueResizingEsThreadPoolExecutorTests} + * based on the time taken for each event. + */ +public class EWMATrackingEsThreadPoolExecutorTests extends ESTestCase { + + public void testExecutionEWMACalculation() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + + EWMATrackingEsThreadPoolExecutor executor = + new EWMATrackingEsThreadPoolExecutor( + "test-threadpool", 1, 1, 1000, + TimeUnit.MILLISECONDS, ConcurrentCollections.newBlockingQueue(), fastWrapper(), + EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(0L)); + executeTask(executor, 1); + assertBusy(() -> { + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(30L)); + }); + executeTask(executor, 1); + assertBusy(() -> { + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(51L)); + }); + executeTask(executor, 1); + assertBusy(() -> { + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(65L)); + }); + executeTask(executor, 1); + assertBusy(() -> { + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(75L)); + }); + executeTask(executor, 1); + assertBusy(() -> { + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(83L)); + }); + + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + + /** Use a runnable wrapper that simulates a task with unknown failures. */ + public void testExceptionThrowingTask() throws Exception { + ThreadContext context = new ThreadContext(Settings.EMPTY); + EWMATrackingEsThreadPoolExecutor executor = + new EWMATrackingEsThreadPoolExecutor( + "test-threadpool", 1, 1, 1000, + TimeUnit.MILLISECONDS, ConcurrentCollections.newBlockingQueue(), exceptionalWrapper(), + EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); + executor.prestartAllCoreThreads(); + logger.info("--> executor: {}", executor); + + assertThat((long)executor.getTaskExecutionEWMA(), equalTo(0L)); + executeTask(executor, 1); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + + private Function fastWrapper() { + return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100), false); + } + + /** + * The returned function outputs a WrappedRunnabled that simulates the case + * where {@link TimedRunnable#getTotalExecutionNanos()} returns -1 because + * the job failed or was rejected before it finished. + */ + private Function exceptionalWrapper() { + return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(-1), true); + } + + /** Execute a blank task {@code times} times for the executor */ + private void executeTask(EWMATrackingEsThreadPoolExecutor executor, int times) { + logger.info("--> executing a task [{}] times", times); + for (int i = 0; i < times; i++) { + executor.execute(() -> {}); + } + } + + public class SettableTimedRunnable extends TimedRunnable { + private final long timeTaken; + private final boolean testFailedOrRejected; + + public SettableTimedRunnable(long timeTaken, boolean failedOrRejected) { + super(() -> {}); + this.timeTaken = timeTaken; + this.testFailedOrRejected = failedOrRejected; + } + + @Override + public long getTotalNanos() { + return timeTaken; + } + + @Override + public long getTotalExecutionNanos() { + return timeTaken; + } + + @Override + public boolean getFailedOrRejected() { + return testFailedOrRejected; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 1fa67a4c3fa0a..cc0d1e36d22bf 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -53,7 +53,7 @@ private String getName() { public void testFixedForcedExecution() throws Exception { EsThreadPoolExecutor executor = - EsExecutors.newFixed(getName(), 1, 1, EsExecutors.daemonThreadFactory("test"), threadContext); + EsExecutors.newFixed(getName(), 1, 1, EsExecutors.daemonThreadFactory("test"), threadContext, randomBoolean()); final CountDownLatch wait = new CountDownLatch(1); final CountDownLatch exec1Wait = new CountDownLatch(1); @@ -116,7 +116,7 @@ public void onFailure(Exception e) { public void testFixedRejected() throws Exception { EsThreadPoolExecutor executor = - EsExecutors.newFixed(getName(), 1, 1, EsExecutors.daemonThreadFactory("test"), threadContext); + EsExecutors.newFixed(getName(), 1, 1, EsExecutors.daemonThreadFactory("test"), threadContext, randomBoolean()); final CountDownLatch wait = new CountDownLatch(1); final CountDownLatch exec1Wait = new CountDownLatch(1); @@ -249,7 +249,7 @@ public void testRejectionMessageAndShuttingDownFlag() throws InterruptedExceptio int actions = queue + pool; final CountDownLatch latch = new CountDownLatch(1); EsThreadPoolExecutor executor = - EsExecutors.newFixed(getName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext); + EsExecutors.newFixed(getName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext, randomBoolean()); try { for (int i = 0; i < actions; i++) { executor.execute(new Runnable() { @@ -339,7 +339,7 @@ public void testInheritContext() throws InterruptedException { final Integer one = Integer.valueOf(1); threadContext.putTransient("foo", one); EsThreadPoolExecutor executor = - EsExecutors.newFixed(getName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext); + EsExecutors.newFixed(getName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext, randomBoolean()); try { executor.execute(() -> { try { @@ -370,7 +370,7 @@ public void testGetTasks() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch executed = new CountDownLatch(1); EsThreadPoolExecutor executor = - EsExecutors.newFixed(getName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext); + EsExecutors.newFixed(getName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext, randomBoolean()); try { Runnable r = () -> { latch.countDown(); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java index 75a2e29946179..be3d5f0b9bd58 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java @@ -80,7 +80,7 @@ public void testConcurrentListenerRegistrationAndCompletion() throws BrokenBarri final int completingThread = randomIntBetween(0, numberOfThreads - 1); final ListenableFuture future = new ListenableFuture<>(); executorService = EsExecutors.newFixed("testConcurrentListenerRegistrationAndCompletion", numberOfThreads, 1000, - EsExecutors.daemonThreadFactory("listener"), threadContext); + EsExecutors.daemonThreadFactory("listener"), threadContext, false); final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); final CountDownLatch listenersLatch = new CountDownLatch(numberOfThreads - 1); final AtomicInteger numResponses = new AtomicInteger(0); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java deleted file mode 100644 index e08a6ca443315..0000000000000 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java +++ /dev/null @@ -1,293 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.ESTestCase; - -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; - -/** - * Tests for the automatic queue resizing of the {@code QueueResizingEsThreadPoolExecutorTests} - * based on the time taken for each event. - */ -public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase { - - public void testExactWindowSizeAdjustment() throws Exception { - ThreadContext context = new ThreadContext(Settings.EMPTY); - ResizableBlockingQueue queue = - new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), 100); - - int threads = randomIntBetween(1, 3); - int measureWindow = 3; - logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); - QueueResizingEsThreadPoolExecutor executor = - new QueueResizingEsThreadPoolExecutor( - "test-threadpool", threads, threads, 1000, - TimeUnit.MILLISECONDS, queue, 10, 1000, fastWrapper(), - measureWindow, TimeValue.timeValueMillis(1), EsExecutors.daemonThreadFactory("queuetest"), - new EsAbortPolicy(), context); - executor.prestartAllCoreThreads(); - logger.info("--> executor: {}", executor); - - // Execute exactly 3 (measureWindow) times - executor.execute(() -> {}); - executor.execute(() -> {}); - executor.execute(() -> {}); - - // The queue capacity should have increased by 50 since they were very fast tasks - assertBusy(() -> { - assertThat(queue.capacity(), equalTo(150)); - }); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); - } - - public void testAutoQueueSizingUp() throws Exception { - ThreadContext context = new ThreadContext(Settings.EMPTY); - ResizableBlockingQueue queue = - new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), - 2000); - - int threads = randomIntBetween(1, 10); - int measureWindow = randomIntBetween(100, 200); - logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); - QueueResizingEsThreadPoolExecutor executor = - new QueueResizingEsThreadPoolExecutor( - "test-threadpool", threads, threads, 1000, - TimeUnit.MILLISECONDS, queue, 10, 3000, fastWrapper(), - measureWindow, TimeValue.timeValueMillis(1), EsExecutors.daemonThreadFactory("queuetest"), - new EsAbortPolicy(), context); - executor.prestartAllCoreThreads(); - logger.info("--> executor: {}", executor); - - // Execute a task multiple times that takes 1ms - executeTask(executor, (measureWindow * 5) + 2); - - assertBusy(() -> { - assertThat(queue.capacity(), greaterThan(2000)); - }); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); - } - - public void testAutoQueueSizingDown() throws Exception { - ThreadContext context = new ThreadContext(Settings.EMPTY); - ResizableBlockingQueue queue = - new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), - 2000); - - int threads = randomIntBetween(1, 10); - int measureWindow = randomIntBetween(100, 200); - logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); - QueueResizingEsThreadPoolExecutor executor = - new QueueResizingEsThreadPoolExecutor( - "test-threadpool", threads, threads, 1000, - TimeUnit.MILLISECONDS, queue, 10, 3000, slowWrapper(), measureWindow, TimeValue.timeValueMillis(1), - EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); - executor.prestartAllCoreThreads(); - logger.info("--> executor: {}", executor); - - // Execute a task multiple times that takes 1m - executeTask(executor, (measureWindow * 5) + 2); - - assertBusy(() -> { - assertThat(queue.capacity(), lessThan(2000)); - }); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); - } - - public void testAutoQueueSizingWithMin() throws Exception { - ThreadContext context = new ThreadContext(Settings.EMPTY); - ResizableBlockingQueue queue = - new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), - 5000); - - int threads = randomIntBetween(1, 5); - int measureWindow = randomIntBetween(10, 100); - int min = randomIntBetween(4981, 4999); - logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); - QueueResizingEsThreadPoolExecutor executor = - new QueueResizingEsThreadPoolExecutor( - "test-threadpool", threads, threads, 1000, - TimeUnit.MILLISECONDS, queue, min, 100000, slowWrapper(), measureWindow, TimeValue.timeValueMillis(1), - EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); - executor.prestartAllCoreThreads(); - logger.info("--> executor: {}", executor); - - // Execute a task multiple times that takes 1m - executeTask(executor, (measureWindow * 5)); - - // The queue capacity should decrease, but no lower than the minimum - assertBusy(() -> { - assertThat(queue.capacity(), equalTo(min)); - }); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); - } - - public void testAutoQueueSizingWithMax() throws Exception { - ThreadContext context = new ThreadContext(Settings.EMPTY); - ResizableBlockingQueue queue = - new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), - 5000); - - int threads = randomIntBetween(1, 5); - int measureWindow = randomIntBetween(10, 100); - int max = randomIntBetween(5010, 5024); - logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow); - QueueResizingEsThreadPoolExecutor executor = - new QueueResizingEsThreadPoolExecutor( - "test-threadpool", threads, threads, 1000, - TimeUnit.MILLISECONDS, queue, 10, max, fastWrapper(), measureWindow, TimeValue.timeValueMillis(1), - EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); - executor.prestartAllCoreThreads(); - logger.info("--> executor: {}", executor); - - // Execute a task multiple times that takes 1ms - executeTask(executor, measureWindow * 3); - - // The queue capacity should increase, but no higher than the maximum - assertBusy(() -> { - assertThat(queue.capacity(), equalTo(max)); - }); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); - } - - public void testExecutionEWMACalculation() throws Exception { - ThreadContext context = new ThreadContext(Settings.EMPTY); - ResizableBlockingQueue queue = - new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), - 100); - - QueueResizingEsThreadPoolExecutor executor = - new QueueResizingEsThreadPoolExecutor( - "test-threadpool", 1, 1, 1000, - TimeUnit.MILLISECONDS, queue, 10, 200, fastWrapper(), 10, TimeValue.timeValueMillis(1), - EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); - executor.prestartAllCoreThreads(); - logger.info("--> executor: {}", executor); - - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(0L)); - executeTask(executor, 1); - assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(30L)); - }); - executeTask(executor, 1); - assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(51L)); - }); - executeTask(executor, 1); - assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(65L)); - }); - executeTask(executor, 1); - assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(75L)); - }); - executeTask(executor, 1); - assertBusy(() -> { - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(83L)); - }); - - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); - } - - /** Use a runnable wrapper that simulates a task with unknown failures. */ - public void testExceptionThrowingTask() throws Exception { - ThreadContext context = new ThreadContext(Settings.EMPTY); - ResizableBlockingQueue queue = - new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), - 100); - - QueueResizingEsThreadPoolExecutor executor = - new QueueResizingEsThreadPoolExecutor( - "test-threadpool", 1, 1, 1000, - TimeUnit.MILLISECONDS, queue, 10, 200, exceptionalWrapper(), 10, TimeValue.timeValueMillis(1), - EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context); - executor.prestartAllCoreThreads(); - logger.info("--> executor: {}", executor); - - assertThat((long)executor.getTaskExecutionEWMA(), equalTo(0L)); - executeTask(executor, 1); - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); - } - - private Function fastWrapper() { - return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100), false); - } - - private Function slowWrapper() { - return (runnable) -> new SettableTimedRunnable(TimeUnit.MINUTES.toNanos(2), false); - } - - /** - * The returned function outputs a WrappedRunnabled that simulates the case - * where {@link TimedRunnable#getTotalExecutionNanos()} returns -1 because - * the job failed or was rejected before it finished. - */ - private Function exceptionalWrapper() { - return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(-1), true); - } - - /** Execute a blank task {@code times} times for the executor */ - private void executeTask(QueueResizingEsThreadPoolExecutor executor, int times) { - logger.info("--> executing a task [{}] times", times); - for (int i = 0; i < times; i++) { - executor.execute(() -> {}); - } - } - - public class SettableTimedRunnable extends TimedRunnable { - private final long timeTaken; - private final boolean testFailedOrRejected; - - public SettableTimedRunnable(long timeTaken, boolean failedOrRejected) { - super(() -> {}); - this.timeTaken = timeTaken; - this.testFailedOrRejected = failedOrRejected; - } - - @Override - public long getTotalNanos() { - return timeTaken; - } - - @Override - public long getTotalExecutionNanos() { - return timeTaken; - } - - @Override - public boolean getFailedOrRejected() { - return testFailedOrRejected; - } - } -} diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueueTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueueTests.java deleted file mode 100644 index b1d5b9bc1bccf..0000000000000 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueueTests.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util.concurrent; - -import org.elasticsearch.test.ESTestCase; - -import static org.hamcrest.Matchers.equalTo; - -public class ResizableBlockingQueueTests extends ESTestCase { - - public void testAdjustCapacity() throws Exception { - ResizableBlockingQueue queue = - new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), - 100); - - assertThat(queue.capacity(), equalTo(100)); - // Queue size already equal to desired capacity - queue.adjustCapacity(100, 25, 1, 1000); - assertThat(queue.capacity(), equalTo(100)); - // Not worth adjusting - queue.adjustCapacity(99, 25, 1, 1000); - assertThat(queue.capacity(), equalTo(100)); - // Not worth adjusting - queue.adjustCapacity(75, 25, 1, 1000); - assertThat(queue.capacity(), equalTo(100)); - queue.adjustCapacity(74, 25, 1, 1000); - assertThat(queue.capacity(), equalTo(75)); - queue.adjustCapacity(1000000, 25, 1, 1000); - assertThat(queue.capacity(), equalTo(100)); - queue.adjustCapacity(1, 25, 80, 1000); - assertThat(queue.capacity(), equalTo(80)); - queue.adjustCapacity(1000000, 25, 80, 100); - assertThat(queue.capacity(), equalTo(100)); - } -} diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 8ddb59ef5db1a..e0dd8c86b824c 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -136,7 +136,8 @@ public void setUpThreadPool() { } else { // verify that both sending and receiving files can be completed with a single thread threadPool = new TestThreadPool(getTestName(), - new FixedExecutorBuilder(Settings.EMPTY, "recovery_executor", between(1, 16), between(16, 128), "recovery_executor")); + new FixedExecutorBuilder(Settings.EMPTY, "recovery_executor", between(1, 16), between(16, 128), "recovery_executor", + false)); recoveryExecutor = threadPool.executor("recovery_executor"); } } diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index 106ec4efcaca0..cc75e8bef5f48 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -70,7 +70,6 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.action.search.SearchShardTask; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; @@ -98,18 +97,13 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class QueryPhaseTests extends IndexShardTestCase { private IndexShard indexShard; - @Override - public Settings threadPoolSettings() { - return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.search.min_queue_size", 10).build(); - } - @Override public void setUp() throws Exception { super.setUp(); diff --git a/server/src/test/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilderTests.java b/server/src/test/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilderTests.java deleted file mode 100644 index ef5f87b940c40..0000000000000 --- a/server/src/test/java/org/elasticsearch/threadpool/AutoQueueAdjustingExecutorBuilderTests.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.threadpool; - -import org.elasticsearch.common.settings.Settings; - -import static org.hamcrest.CoreMatchers.containsString; - -public class AutoQueueAdjustingExecutorBuilderTests extends ESThreadPoolTestCase { - - public void testValidatingMinMaxSettings() { - Settings settings = Settings.builder() - .put("thread_pool.test.min_queue_size", randomIntBetween(30, 100)) - .put("thread_pool.test.max_queue_size", randomIntBetween(1,25)) - .build(); - try { - new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 1, 100, 10); - fail("should have thrown an exception"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("Failed to parse value")); - } - - settings = Settings.builder() - .put("thread_pool.test.min_queue_size", 10) - .put("thread_pool.test.max_queue_size", 9) - .build(); - try { - new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 1, 100, 2000).getSettings(settings); - fail("should have thrown an exception"); - } catch (IllegalArgumentException e) { - assertEquals(e.getMessage(), "Failed to parse value [10] for setting [thread_pool.test.min_queue_size] must be <= 9"); - } - - settings = Settings.builder() - .put("thread_pool.test.min_queue_size", 11) - .put("thread_pool.test.max_queue_size", 10) - .build(); - try { - new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 1, 100, 2000).getSettings(settings); - fail("should have thrown an exception"); - } catch (IllegalArgumentException e) { - assertEquals(e.getMessage(), "Failed to parse value [11] for setting [thread_pool.test.min_queue_size] must be <= 10"); - } - - settings = Settings.builder() - .put("thread_pool.test.min_queue_size", 101) - .build(); - try { - new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 100, 100, 2000).getSettings(settings); - fail("should have thrown an exception"); - } catch (IllegalArgumentException e) { - assertEquals(e.getMessage(), "Failed to parse value [101] for setting [thread_pool.test.min_queue_size] must be <= 100"); - } - - settings = Settings.builder() - .put("thread_pool.test.max_queue_size", 99) - .build(); - try { - new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 15, 100, 100, 2000).getSettings(settings); - fail("should have thrown an exception"); - } catch (IllegalArgumentException e) { - assertEquals(e.getMessage(), "Failed to parse value [100] for setting [thread_pool.test.min_queue_size] must be <= 99"); - } - } - - public void testSetLowerSettings() { - Settings settings = Settings.builder() - .put("thread_pool.test.min_queue_size", 10) - .put("thread_pool.test.max_queue_size", 10) - .build(); - AutoQueueAdjustingExecutorBuilder test = new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 1000, 1000, 1000, 2000); - AutoQueueAdjustingExecutorBuilder.AutoExecutorSettings s = test.getSettings(settings); - assertEquals(10, s.maxQueueSize); - assertEquals(10, s.minQueueSize); - } - - public void testSetHigherSettings() { - Settings settings = Settings.builder() - .put("thread_pool.test.min_queue_size", 2000) - .put("thread_pool.test.max_queue_size", 3000) - .build(); - AutoQueueAdjustingExecutorBuilder test = new AutoQueueAdjustingExecutorBuilder(settings, "test", 1, 1000, 1000, 1000, 2000); - AutoQueueAdjustingExecutorBuilder.AutoExecutorSettings s = test.getSettings(settings); - assertEquals(3000, s.maxQueueSize); - assertEquals(2000, s.minQueueSize); - } - -} diff --git a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index c004ed9b3bcc8..c51aa831a0476 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -187,7 +187,7 @@ public void testCustomThreadPool() throws Exception { EsExecutors.numberOfProcessors(Settings.EMPTY), TimeValue.timeValueMinutes(1)); - final FixedExecutorBuilder fixed = new FixedExecutorBuilder(Settings.EMPTY, "my_pool2", 1, 1); + final FixedExecutorBuilder fixed = new FixedExecutorBuilder(Settings.EMPTY, "my_pool2", 1, 1, false); threadPool = new ThreadPool(Settings.builder().put("node.name", "testCustomThreadPool").build(), scaling, fixed); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index b144cc8621b13..240190d5f6d9c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1615,12 +1615,6 @@ protected Settings nodeSettings(int nodeOrdinal) { .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()) .putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes .putList(DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), "file"); - if (rarely()) { - // Sometimes adjust the minimum search thread pool size, causing - // QueueResizingEsThreadPoolExecutor to be used instead of a regular - // fixed thread pool - builder.put("thread_pool.search.min_queue_size", 100); - } return builder.build(); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 67907181699ff..643ca8f055a96 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -334,7 +334,8 @@ public List> getExecutorBuilders(Settings settings) { return Collections.emptyList(); } - return Collections.singletonList(new FixedExecutorBuilder(settings, CCR_THREAD_POOL_NAME, 32, 100, "xpack.ccr.ccr_thread_pool")); + return Collections.singletonList( + new FixedExecutorBuilder(settings, CCR_THREAD_POOL_NAME, 32, 100, "xpack.ccr.ccr_thread_pool", false)); } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java index 5f021a36c05df..412116b2a3411 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java @@ -58,7 +58,8 @@ public void initialize() throws IOException { when(outputStream.read(new byte[512])).thenReturn(-1); restoreStream = mock(OutputStream.class); onProcessCrash = mock(Consumer.class); - executorService = EsExecutors.newFixed("test", 1, 1, EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); + executorService = EsExecutors.newFixed("test", 1, 1, EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY), + false); } @After diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 4ffe0ea3e417a..722e6d34caaeb 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -158,7 +158,7 @@ public List> getExecutorBuilders(Settings settings) { } FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, Rollup.TASK_THREAD_POOL_NAME, - 4, 4, "xpack.rollup.task_thread_pool"); + 4, 4, "xpack.rollup.task_thread_pool", false); return Collections.singletonList(indexing); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index d5f9aaf064830..8bb3ccc8fbe7b 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -970,7 +970,8 @@ public UnaryOperator getRestHandlerWrapper(ThreadContext threadCont public List> getExecutorBuilders(final Settings settings) { if (enabled) { return Collections.singletonList( - new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000, "xpack.security.authc.token.thread_pool")); + new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000, "xpack.security.authc.token.thread_pool", + false)); } return Collections.emptyList(); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java index 3a9399494e9b9..c4e70071aec68 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java @@ -200,7 +200,8 @@ public void init() throws Exception { auditTrail = mock(AuditTrailService.class); client = mock(Client.class); threadPool = new ThreadPool(settings, - new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000, "xpack.security.authc.token.thread_pool")); + new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000, "xpack.security.authc.token.thread_pool", + false)); threadContext = threadPool.getThreadContext(); when(client.threadPool()).thenReturn(threadPool); when(client.settings()).thenReturn(settings); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java index 4983310a3248d..4e8d270b89d57 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java @@ -155,7 +155,8 @@ public void tearDown() throws Exception { @BeforeClass public static void startThreadPool() throws IOException { threadPool = new ThreadPool(settings, - new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000, "xpack.security.authc.token.thread_pool")); + new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000, "xpack.security.authc.token.thread_pool", + false)); new Authentication(new User("foo"), new RealmRef("realm", "type", "node"), null).writeToContext(threadPool.getThreadContext()); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 8c71f0b9c6e65..12f82b6427171 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -222,7 +222,8 @@ public List> getExecutorBuilders(Settings settings) { return emptyList(); } - FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, TASK_THREAD_POOL_NAME, 4, 4, "transform.task_thread_pool"); + FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, TASK_THREAD_POOL_NAME, 4, 4, "transform.task_thread_pool", + false); return Collections.singletonList(indexing); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 17a98b7532b88..4f1d5f8675a85 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -492,7 +492,8 @@ public List> getExecutorBuilders(final Settings settings) { InternalWatchExecutor.THREAD_POOL_NAME, getWatcherThreadPoolSize(settings), 1000, - "xpack.watcher.thread_pool"); + "xpack.watcher.thread_pool", + false); return Collections.singletonList(builder); } return Collections.emptyList(); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 32031e78f5e46..e7dd6c06f28d6 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -101,7 +101,7 @@ public class WatcherService { ExecutionService executionService, WatchParser parser, Client client) { this(settings, triggerService, triggeredWatchStore, executionService, parser, client, EsExecutors.newFixed(LIFECYCLE_THREADPOOL_NAME, 1, 1000, daemonThreadFactory(settings, LIFECYCLE_THREADPOOL_NAME), - client.threadPool().getThreadContext())); + client.threadPool().getThreadContext(), false)); } /**