Skip to content

Commit

Permalink
Integrate threadpool scheduling with AbstractRunnable (#106542) (#1…
Browse files Browse the repository at this point in the history
…06548)

Today `ThreadPool#scheduleWithFixedDelay` does not interact as expected
with `AbstractRunnable`: if the task fails or is rejected then this
isn't passed back to the relevant callback, and the task cannot specify
that it should be force-executed. This commit fixes that.
  • Loading branch information
DaveCTurner authored Mar 20, 2024
1 parent 21f1123 commit d281df7
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 8 deletions.
23 changes: 20 additions & 3 deletions server/src/main/java/org/elasticsearch/threadpool/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ static boolean awaitTermination(
* not be interrupted.
*/
default Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, Executor executor) {
var runnable = new ReschedulingRunnable(command, interval, executor, this, (e) -> {}, (e) -> {});
var runnable = new ReschedulingRunnable(command, interval, executor, this, e -> {}, e -> {});
runnable.start();
return runnable;
}
Expand Down Expand Up @@ -226,13 +226,25 @@ public void doRun() {

@Override
public void onFailure(Exception e) {
failureConsumer.accept(e);
try {
if (runnable instanceof AbstractRunnable abstractRunnable) {
abstractRunnable.onFailure(e);
}
} finally {
failureConsumer.accept(e);
}
}

@Override
public void onRejection(Exception e) {
run = false;
rejectionConsumer.accept(e);
try {
if (runnable instanceof AbstractRunnable abstractRunnable) {
abstractRunnable.onRejection(e);
}
} finally {
rejectionConsumer.accept(e);
}
}

@Override
Expand All @@ -247,6 +259,11 @@ public void onAfter() {
}
}

@Override
public boolean isForceExecution() {
return runnable instanceof AbstractRunnable abstractRunnable && abstractRunnable.isForceExecution();
}

@Override
public String toString() {
return "ReschedulingRunnable{" + "runnable=" + runnable + ", interval=" + interval + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,14 @@ public void scheduleUnlessShuttingDown(TimeValue delay, Executor executor, Runna
}

public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, Executor executor) {
var runnable = new ReschedulingRunnable(command, interval, executor, this, (e) -> {
if (logger.isDebugEnabled()) {
logger.debug(() -> format("scheduled task [%s] was rejected on thread pool [%s]", command, executor), e);
}
}, (e) -> logger.warn(() -> format("failed to run scheduled task [%s] on thread pool [%s]", command, executor), e));
var runnable = new ReschedulingRunnable(
command,
interval,
executor,
this,
e -> logger.debug(() -> format("scheduled task [%s] was rejected on thread pool [%s]", command, executor), e),
e -> logger.warn(() -> format("failed to run scheduled task [%s] on thread pool [%s]", command, executor), e)
);
runnable.start();
return runnable;
}
Expand Down
154 changes: 154 additions & 0 deletions server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
Expand All @@ -25,7 +29,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT;
import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DO_NOT_TRACK;
import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.assertCurrentMethodIsNotCalledRecursively;
Expand Down Expand Up @@ -405,4 +412,151 @@ public void testSearchWorkedThreadPool() {
assertTrue(terminate(threadPool));
}
}

public void testScheduledOneShotRejection() {
final var name = "fixed-bounded";
final var threadPool = new TestThreadPool(
getTestName(),
new FixedExecutorBuilder(Settings.EMPTY, name, between(1, 5), between(1, 5), randomFrom(DEFAULT, DO_NOT_TRACK))
);

final var future = new PlainActionFuture<Void>();
final var latch = new CountDownLatch(1);
try {
blockExecution(threadPool.executor(name), latch);
threadPool.schedule(
ActionRunnable.run(future, () -> fail("should not execute")),
TimeValue.timeValueMillis(between(1, 100)),
threadPool.executor(name)
);

expectThrows(EsRejectedExecutionException.class, () -> FutureUtils.get(future, 10, TimeUnit.SECONDS));
} finally {
latch.countDown();
assertTrue(terminate(threadPool));
}
}

public void testScheduledOneShotForceExecution() {
final var name = "fixed-bounded";
final var threadPool = new TestThreadPool(
getTestName(),
new FixedExecutorBuilder(Settings.EMPTY, name, between(1, 5), between(1, 5), randomFrom(DEFAULT, DO_NOT_TRACK))
);

final var future = new PlainActionFuture<Void>();
final var latch = new CountDownLatch(1);
try {
blockExecution(threadPool.executor(name), latch);
threadPool.schedule(
forceExecution(ActionRunnable.run(future, () -> {})),
TimeValue.timeValueMillis(between(1, 100)),
threadPool.executor(name)
);

Thread.yield();
assertFalse(future.isDone());

latch.countDown();
FutureUtils.get(future, 10, TimeUnit.SECONDS); // shouldn't throw
} finally {
latch.countDown();
assertTrue(terminate(threadPool));
}
}

public void testScheduledFixedDelayRejection() {
final var name = "fixed-bounded";
final var threadPool = new TestThreadPool(
getTestName(),
new FixedExecutorBuilder(Settings.EMPTY, name, between(1, 5), between(1, 5), randomFrom(DEFAULT, DO_NOT_TRACK))
);

final var future = new PlainActionFuture<Void>();
final var latch = new CountDownLatch(1);
try {
threadPool.scheduleWithFixedDelay(
ActionRunnable.wrap(future, ignored -> Thread.yield()),
TimeValue.timeValueMillis(between(1, 100)),
threadPool.executor(name)
);

while (future.isDone() == false) {
// might not block all threads the first time round if the scheduled runnable is running, so must keep trying
blockExecution(threadPool.executor(name), latch);
}
expectThrows(EsRejectedExecutionException.class, () -> FutureUtils.get(future));
} finally {
latch.countDown();
assertTrue(terminate(threadPool));
}
}

public void testScheduledFixedDelayForceExecution() {
final var name = "fixed-bounded";
final var threadPool = new TestThreadPool(
getTestName(),
new FixedExecutorBuilder(Settings.EMPTY, name, between(1, 5), between(1, 5), randomFrom(DEFAULT, DO_NOT_TRACK))
);

final var future = new PlainActionFuture<Void>();
final var latch = new CountDownLatch(1);
try {
blockExecution(threadPool.executor(name), latch);

threadPool.scheduleWithFixedDelay(
forceExecution(ActionRunnable.run(future, Thread::yield)),
TimeValue.timeValueMillis(between(1, 100)),
threadPool.executor(name)
);

assertFalse(future.isDone());

latch.countDown();
FutureUtils.get(future, 10, TimeUnit.SECONDS); // shouldn't throw
} finally {
latch.countDown();
assertTrue(terminate(threadPool));
}
}

private static AbstractRunnable forceExecution(AbstractRunnable delegate) {
return new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
delegate.onFailure(e);
}

@Override
protected void doRun() {
delegate.run();
}

@Override
public void onRejection(Exception e) {
delegate.onRejection(e);
}

@Override
public void onAfter() {
delegate.onAfter();
}

@Override
public boolean isForceExecution() {
return true;
}
};
}

private static void blockExecution(ExecutorService executor, CountDownLatch latch) {
while (true) {
try {
executor.execute(() -> safeAwait(latch));
} catch (EsRejectedExecutionException e) {
break;
}
}
}

}

0 comments on commit d281df7

Please sign in to comment.