Skip to content

Commit

Permalink
Remove fixed_auto_queue_size threadpool type (#52280)
Browse files Browse the repository at this point in the history
* Remove fixed_auto_queue_size threadpool type

* Remove less

* compilation fix

* weaken assertion to accomodate tests that mock threadpool
  • Loading branch information
ywelsch authored Feb 14, 2020
1 parent cecee07 commit a9afdd7
Show file tree
Hide file tree
Showing 32 changed files with 316 additions and 1,134 deletions.
1 change: 1 addition & 0 deletions docs/reference/migration/migrate_8_0.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,6 @@ include::migrate_8_0/http.asciidoc[]
include::migrate_8_0/reindex.asciidoc[]
include::migrate_8_0/search.asciidoc[]
include::migrate_8_0/settings.asciidoc[]
include::migrate_8_0/threadpool.asciidoc[]
include::migrate_8_0/indices.asciidoc[]
include::migrate_8_0/api.asciidoc[]
10 changes: 10 additions & 0 deletions docs/reference/migration/migrate_8_0/threadpool.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[float]
[[breaking_80_threadpool_changes]]
=== Thread pool changes

[float]
==== Removal of the `fixed_auto_queue_size` thread pool type

The `fixed_auto_queue_size` thread pool type, previously marked as an
experimental feature, was deprecated in 7.x and has been removed in 8.0.
The `search` and `search_throttled` thread pools have the `fixed` type now.
53 changes: 3 additions & 50 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ There are several thread pools, but the important ones include:

`search`::
For count/search/suggest operations. Thread pool type is
`fixed_auto_queue_size` with a size of
`int((# of available_processors * 3) / 2) + 1`, and initial queue_size of
`fixed` with a size of
`int((# of available_processors * 3) / 2) + 1`, and queue_size of
`1000`.

[[search-throttled]]`search_throttled`::
For count/search/suggest/get operations on `search_throttled indices`.
Thread pool type is `fixed_auto_queue_size` with a size of `1`, and initial
queue_size of `100`.
Thread pool type is `fixed` with a size of `1`, and queue_size of `100`.

`get`::
For get operations. Thread pool type is `fixed`
Expand Down Expand Up @@ -119,52 +118,6 @@ thread_pool:
queue_size: 1000
--------------------------------------------------

[float]
[[fixed-auto-queue-size]]
==== `fixed_auto_queue_size`

experimental[]

The `fixed_auto_queue_size` thread pool holds a fixed size of threads to handle
the requests with a bounded queue for pending requests that have no threads to
service them. It's similar to the `fixed` threadpool, however, the `queue_size`
automatically adjusts according to calculations based on
https://en.wikipedia.org/wiki/Little%27s_law[Little's Law]. These calculations
will potentially adjust the `queue_size` up or down by 50 every time
`auto_queue_frame_size` operations have been completed.

The `size` parameter controls the number of threads.

The `queue_size` allows to control the initial size of the queue of pending
requests that have no threads to execute them.

The `min_queue_size` setting controls the minimum amount the `queue_size` can be
adjusted to.

The `max_queue_size` setting controls the maximum amount the `queue_size` can be
adjusted to.

The `auto_queue_frame_size` setting controls the number of operations during
which measurement is taken before the queue is adjusted. It should be large
enough that a single operation cannot unduly bias the calculation.

The `target_response_time` is a time value setting that indicates the targeted
average response time for tasks in the thread pool queue. If tasks are routinely
above this time, the thread pool queue will be adjusted down so that tasks are
rejected.

[source,yaml]
--------------------------------------------------
thread_pool:
search:
size: 30
queue_size: 500
min_queue_size: 10
max_queue_size: 1000
auto_queue_frame_size: 2000
target_response_time: 1s
--------------------------------------------------

[float]
[[scaling]]
==== `scaling`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testExecutionErrorOnDirectExecutorService() throws InterruptedExcept

public void testExecutionErrorOnFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1,
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), randomBoolean());
try {
checkExecutionError(getExecuteRunner(fixedExecutor));
checkExecutionError(getSubmitRunner(fixedExecutor));
Expand All @@ -91,17 +91,6 @@ public void testExecutionErrorOnScalingESThreadPoolExecutor() throws Interrupted
}
}

public void testExecutionErrorOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1,
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionError(getExecuteRunner(autoQueueFixedExecutor));
checkExecutionError(getSubmitRunner(autoQueueFixedExecutor));
} finally {
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
Expand Down Expand Up @@ -180,7 +169,7 @@ public void testExecutionExceptionOnDirectExecutorService() throws InterruptedEx

public void testExecutionExceptionOnFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1,
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), randomBoolean());
try {
checkExecutionException(getExecuteRunner(fixedExecutor), true);
checkExecutionException(getSubmitRunner(fixedExecutor), false);
Expand All @@ -200,18 +189,6 @@ public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws Interru
}
}

public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1,
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), true);
checkExecutionException(getSubmitRunner(autoQueueFixedExecutor), false);
} finally {
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
import org.elasticsearch.common.unit.TimeValue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
* An extension to thread pool executor, which tracks the exponentially weighted moving average of the task execution time.
*/
public final class EWMATrackingEsThreadPoolExecutor extends EsThreadPoolExecutor {

// This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable
public static double EWMA_ALPHA = 0.3;

private final Function<Runnable, WrappedRunnable> runnableWrapper;
private final ExponentiallyWeightedMovingAverage executionEWMA;

EWMATrackingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, Function<Runnable, WrappedRunnable> runnableWrapper,
ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler, contextHolder);
this.runnableWrapper = runnableWrapper;
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0);
}

@Override
protected Runnable wrapRunnable(Runnable command) {
return super.wrapRunnable(this.runnableWrapper.apply(command));
}

@Override
protected Runnable unwrap(Runnable runnable) {
final Runnable unwrapped = super.unwrap(runnable);
if (unwrapped instanceof WrappedRunnable) {
return ((WrappedRunnable) unwrapped).unwrap();
} else {
return unwrapped;
}
}

/**
* Returns the exponentially weighted moving average of the task execution time
*/
public double getTaskExecutionEWMA() {
return executionEWMA.getAverage();
}

/**
* Returns the current queue size (operations that are queued)
*/
public int getCurrentQueueSize() {
return getQueue().size();
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// A task has been completed, it has left the building. We should now be able to get the
// total time as a combination of the time in the queue and time spent running the task. We
// only want runnables that did not throw errors though, because they could be fast-failures
// that throw off our timings, so only check when t is null.
assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
final boolean failedOrRejected = timedRunnable.getFailedOrRejected();
final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos();
assert taskExecutionNanos >= 0 || (failedOrRejected && taskExecutionNanos == -1) :
"expected task to always take longer than 0 nanoseconds or have '-1' failure code, got: " + taskExecutionNanos +
", failedOrRejected: " + failedOrRejected;
if (taskExecutionNanos != -1) {
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
}

@Override
protected void appendThreadPoolExecutorDetails(StringBuilder sb) {
sb.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long) executionEWMA.getAverage())).append(", ");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;

import java.util.Arrays;
Expand Down Expand Up @@ -83,38 +82,20 @@ public static EsThreadPoolExecutor newScaling(String name, int min, int max, lon
}

public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity,
ThreadFactory threadFactory, ThreadContext contextHolder) {
ThreadFactory threadFactory, ThreadContext contextHolder, boolean trackEWMA) {
BlockingQueue<Runnable> queue;
if (queueCapacity < 0) {
queue = ConcurrentCollections.newBlockingQueue();
} else {
queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
}
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
queue, threadFactory, new EsAbortPolicy(), contextHolder);
}

/**
* Return a new executor that will automatically adjust the queue size based on queue throughput.
*
* @param size number of fixed threads to use for executing tasks
* @param initialQueueCapacity initial size of the executor queue
* @param minQueueSize minimum queue size that the queue can be adjusted to
* @param maxQueueSize maximum queue size that the queue can be adjusted to
* @param frameSize number of tasks during which stats are collected before adjusting queue size
*/
public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
ThreadFactory threadFactory, ThreadContext contextHolder) {
if (initialQueueCapacity <= 0) {
throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
initialQueueCapacity);
if (trackEWMA) {
return new EWMATrackingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
queue, TimedRunnable::new, threadFactory, new EsAbortPolicy(), contextHolder);
} else {
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
queue, threadFactory, new EsAbortPolicy(), contextHolder);
}
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
new EsAbortPolicy(), contextHolder);
}

/**
Expand Down
Loading

0 comments on commit a9afdd7

Please sign in to comment.