Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add threadpool wait time metric #9681

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand All @@ -94,4 +95,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
Original file line number Diff line number Diff line change
@@ -1,3 +1,30 @@
"Test cat thread_pool total_wait_time output":
- skip:
version: " - 3.0.0"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was supposed to be 2.99.99 as well....will fix this in the backport/bwc PRs 🤦‍♂️

reason: thread_pool total_wait_time stats were introduced in V_3.0.0

- do:
cat.thread_pool: {}

- match:
$body: |
/ #node_name name active queue rejected
^ (\S+ \s+ \S+ \s+ \d+ \s+ \d+ \s+ \d+ \n)+ $/
- do:
cat.thread_pool:
thread_pool_patterns: search,search_throttled,index_searcher,generic
h: name,total_wait_time,twt
v: true

- match:
$body: |
/^ id \s+ name \s+ total_wait_time \s+ twt \n
(\S+ \s+ search \s+ \d+s \s+ \d+ \n
\S+ \s+ search_throttled \s+ \d+s \s+ \d+ \n
\S+ \s+ index_searcher \s+ \d+s \s+ \d+ \n
\S+ \s+ generic \s+ -1 \s+ -1 \n)+ $/
---
"Test cat thread_pool output":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.search.stats;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
Expand All @@ -26,6 +28,8 @@
import org.opensearch.search.SearchService;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPoolStats;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -35,6 +39,7 @@
import java.util.function.Function;

import static org.opensearch.index.query.QueryBuilders.scriptQuery;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

Expand Down Expand Up @@ -307,6 +312,54 @@ public void testAvgConcurrencyIndexLevel() throws InterruptedException {
assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0);
}

public void testThreadPoolWaitTime() throws Exception {
int NUM_SHARDS = 1;
String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
createIndex(
INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);

ensureGreen();

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).get();
refresh();
}
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 10))
.execute()
.actionGet();

client().prepareSearch(INDEX).execute().actionGet();

NodesStatsRequestBuilder nodesStatsRequestBuilder = new NodesStatsRequestBuilder(
client().admin().cluster(),
NodesStatsAction.INSTANCE
).setNodesIds().all();
NodesStatsResponse nodesStatsResponse = nodesStatsRequestBuilder.execute().actionGet();
ThreadPoolStats threadPoolStats = nodesStatsResponse.getNodes().get(0).getThreadPool();

for (ThreadPoolStats.Stats stats : threadPoolStats) {
if (stats.getName().equals(ThreadPool.Names.INDEX_SEARCHER)) {
assertThat(stats.getWaitTime().micros(), greaterThan(0L));
}
}

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2))
.execute()
.actionGet();
}

public static class ScriptedDelayedPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_timeout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,15 @@ protected Runnable wrapRunnable(Runnable command) {
protected Runnable unwrap(Runnable runnable) {
return contextHolder.unwrap(runnable);
}

/**
* Returns the cumulative wait time of the ThreadPool. If the ThreadPool does not support tracking the cumulative pool wait time
* then this should return -1 which will prevent the value from showing up in {@link org.opensearch.threadpool.ThreadPoolStats}.
* ThreadPools that do support this metric should override this method. For example, {@link QueueResizingOpenSearchThreadPoolExecutor}
* does so using the {@link TimedRunnable} to get the difference between Runnable creation and execution.
*
*/
public long getPoolWaitTimeNanos() {
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.common.util.concurrent;

import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.metrics.CounterMetric;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +28,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch
private final ResizableBlockingQueue<Runnable> workQueue;
private final Function<Runnable, WrappedRunnable> runnableWrapper;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final CounterMetric poolWaitTime;

/**
* Create new resizable at runtime thread pool executor
Expand Down Expand Up @@ -101,6 +103,7 @@ public final class QueueResizableOpenSearchThreadPoolExecutor extends OpenSearch
this.workQueue = workQueue;
this.runnableWrapper = runnableWrapper;
this.executionEWMA = new ExponentiallyWeightedMovingAverage(ewmaAlpha, 0);
this.poolWaitTime = new CounterMetric();
}

@Override
Expand Down Expand Up @@ -156,6 +159,7 @@ protected void afterExecute(Runnable r, Throwable t) {
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
poolWaitTime.inc(timedRunnable.getWaitTimeNanos());
}

/**
Expand All @@ -173,4 +177,9 @@ public synchronized int resize(int capacity) {
capacity
);
}

@Override
public long getPoolWaitTimeNanos() {
return poolWaitTime.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.unit.TimeValue;

import java.util.Locale;
Expand Down Expand Up @@ -66,6 +67,7 @@
private final int maxQueueSize;
private final long targetedResponseTimeNanos;
private final ExponentiallyWeightedMovingAverage executionEWMA;
private final CounterMetric poolWaitTime;

private final AtomicLong totalTaskNanos = new AtomicLong(0);
private final AtomicInteger taskCount = new AtomicInteger(0);
Expand Down Expand Up @@ -97,6 +99,7 @@
this.maxQueueSize = maxQueueSize;
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0);
this.poolWaitTime = new CounterMetric();
logger.debug(
"thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
getName(),
Expand Down Expand Up @@ -190,6 +193,7 @@
// taskExecutionNanos may be -1 if the task threw an exception
executionEWMA.addValue(taskExecutionNanos);
}
poolWaitTime.inc(timedRunnable.getWaitTimeNanos());

if (taskCount.incrementAndGet() == this.tasksPerFrame) {
final long endTimeNs = System.nanoTime();
Expand Down Expand Up @@ -290,4 +294,8 @@
sb.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
}

@Override
public long getPoolWaitTimeNanos() {
return poolWaitTime.count();

Check warning on line 299 in server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/util/concurrent/QueueResizingOpenSearchThreadPoolExecutor.java#L299

Added line #L299 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@
return Math.max(finishTimeNanos - startTimeNanos, 1);
}

long getWaitTimeNanos() {
jed326 marked this conversation as resolved.
Show resolved Hide resolved
if (startTimeNanos == -1) {
// There must have been an exception thrown, the total time is unknown (-1)
return -1;

Check warning on line 113 in server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/util/concurrent/TimedRunnable.java#L113

Added line #L113 was not covered by tests
}
return Math.max(startTimeNanos - creationTimeNanos, 1);
jed326 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* If the task was failed or rejected, return true.
* Otherwise, false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@
table.addCell("rejected", "alias:r;default:true;text-align:right;desc:number of rejected tasks");
table.addCell("largest", "alias:l;default:false;text-align:right;desc:highest number of seen active threads");
table.addCell("completed", "alias:c;default:false;text-align:right;desc:number of completed tasks");
table.addCell(

Check warning on line 166 in server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestThreadPoolAction.java#L166

Added line #L166 was not covered by tests
"total_wait_time",
"alias:twt;default:false;text-align:right;desc:total time tasks spent waiting in thread_pool queue"
);
table.addCell("core", "alias:cr;default:false;text-align:right;desc:core number of threads in a scaling thread pool");
table.addCell("max", "alias:mx;default:false;text-align:right;desc:maximum number of threads in a scaling thread pool");
table.addCell("size", "alias:sz;default:false;text-align:right;desc:number of threads in a fixed thread pool");
Expand Down Expand Up @@ -267,6 +271,7 @@
table.addCell(poolStats == null ? null : poolStats.getRejected());
table.addCell(poolStats == null ? null : poolStats.getLargest());
table.addCell(poolStats == null ? null : poolStats.getCompleted());
table.addCell(poolStats == null ? null : poolStats.getWaitTime());
table.addCell(core);
table.addCell(max);
table.addCell(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,19 +383,21 @@ public ThreadPoolStats stats() {
long rejected = -1;
int largest = -1;
long completed = -1;
if (holder.executor() instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) holder.executor();
long waitTimeNanos = -1;
if (holder.executor() instanceof OpenSearchThreadPoolExecutor) {
jed326 marked this conversation as resolved.
Show resolved Hide resolved
OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor();
threads = threadPoolExecutor.getPoolSize();
queue = threadPoolExecutor.getQueue().size();
active = threadPoolExecutor.getActiveCount();
largest = threadPoolExecutor.getLargestPoolSize();
completed = threadPoolExecutor.getCompletedTaskCount();
waitTimeNanos = threadPoolExecutor.getPoolWaitTimeNanos();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
if (rejectedExecutionHandler instanceof XRejectedExecutionHandler) {
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
}
}
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed));
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTimeNanos));
}
return new ThreadPoolStats(stats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.threadpool;

import org.opensearch.Version;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand Down Expand Up @@ -65,15 +67,17 @@
private final long rejected;
private final int largest;
private final long completed;
private final long waitTimeNanos;

public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed) {
public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed, long waitTimeNanos) {
this.name = name;
this.threads = threads;
this.queue = queue;
this.active = active;
this.rejected = rejected;
this.largest = largest;
this.completed = completed;
this.waitTimeNanos = waitTimeNanos;
}

public Stats(StreamInput in) throws IOException {
Expand All @@ -84,6 +88,7 @@
rejected = in.readLong();
largest = in.readInt();
completed = in.readLong();
waitTimeNanos = in.getVersion().onOrAfter(Version.V_3_0_0) ? in.readLong() : -1;
}

@Override
Expand All @@ -95,6 +100,9 @@
out.writeLong(rejected);
out.writeInt(largest);
out.writeLong(completed);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(waitTimeNanos);
}
}

public String getName() {
Expand Down Expand Up @@ -125,6 +133,14 @@
return this.completed;
}

public TimeValue getWaitTime() {
return TimeValue.timeValueNanos(waitTimeNanos);

Check warning on line 137 in server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java#L137

Added line #L137 was not covered by tests
}

public long getWaitTimeNanos() {
return waitTimeNanos;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name);
Expand All @@ -146,6 +162,12 @@
if (completed != -1) {
builder.field(Fields.COMPLETED, completed);
}
if (waitTimeNanos != -1) {
if (builder.humanReadable()) {
builder.field(Fields.WAIT_TIME, getWaitTime());

Check warning on line 167 in server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java#L167

Added line #L167 was not covered by tests
}
builder.field(Fields.WAIT_TIME_NANOS, getWaitTimeNanos());
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -197,6 +219,8 @@
static final String REJECTED = "rejected";
static final String LARGEST = "largest";
static final String COMPLETED = "completed";
static final String WAIT_TIME = "total_wait_time";
jed326 marked this conversation as resolved.
Show resolved Hide resolved
static final String WAIT_TIME_NANOS = "total_wait_time_in_nanos";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
randomIntBetween(1, 1000),
randomNonNegativeLong(),
randomIntBetween(1, 1000),
randomIntBetween(1, 1000)
randomIntBetween(1, 1000),
randomIntBetween(-1, 10)
)
);
}
Expand Down
Loading
Loading