Skip to content

Commit

Permalink
Include queued submissions in executor.queued
Browse files Browse the repository at this point in the history
This was missed when the metric was originally implemented for ForkJoinPool. Also aligns the description with the other `executor.queued` gauge for `ThreadPoolExecutor`.

Resolves gh-5650
  • Loading branch information
shakuzen committed Dec 5, 2024
1 parent 7d34b2e commit c5514c1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,11 @@ private void monitor(MeterRegistry registry, ForkJoinPool fj) {
+ "underestimates the actual total number of steals when the pool " + "is not quiescent")
.register(registry);

Gauge.builder(metricPrefix + "executor.queued", fj, ForkJoinPool::getQueuedTaskCount)
Gauge
.builder(metricPrefix + "executor.queued", fj,
pool -> pool.getQueuedTaskCount() + pool.getQueuedSubmissionCount())
.tags(tags)
.description("An estimate of the total number of tasks currently held in queues by worker threads")
.description("The approximate number of tasks that are queued for execution")
.register(registry);

Gauge.builder(metricPrefix + "executor.active", fj, ForkJoinPool::getActiveThreadCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.params.provider.CsvSource;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.AssertionsForClassTypes.*;

Expand Down Expand Up @@ -259,6 +260,32 @@ void monitorScheduledExecutorServiceWithRepetitiveTasks(String metricPrefix, Str
assertThat(registry.get(expectedMetricPrefix + "executor.idle").tags(userTags).timer().count()).isEqualTo(0L);
}

@Test
@Issue("#5650")
void queuedSubmissionsAreIncludedInExecutorQueuedMetric() {
ForkJoinPool pool = new ForkJoinPool(1, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, false, 1, 1, 1,
a -> true, 555, TimeUnit.MILLISECONDS);
ExecutorServiceMetrics.monitor(registry, pool, "myForkJoinPool");
AtomicBoolean busy = new AtomicBoolean(true);

// will be an active task
pool.execute(() -> {
while (busy.get()) {
}
});

// will be queued for submission
pool.execute(() -> {
});
pool.execute(() -> {
});

double queued = registry.get("executor.queued").tag("name", "myForkJoinPool").gauge().value();
busy.set(false);

assertThat(queued).isEqualTo(2.0);
}

@SuppressWarnings("unchecked")
private <T extends Executor> T monitorExecutorService(String executorName, String metricPrefix, T exec) {
if (metricPrefix == null) {
Expand Down

0 comments on commit c5514c1

Please sign in to comment.