Skip to content

Commit

Permalink
Handle scheduler exceptions (elastic#38014)
Browse files Browse the repository at this point in the history
Scheduler.schedule(...) would previously assume that caller handles
exception by calling get() on the returned ScheduledFuture.
schedule() now returns a ScheduledCancellable that no longer gives
access to the exception. Instead, any exception thrown out of a
scheduled Runnable is logged as a warning.

In this backport to 6.x, source backwards compatibility is maintained
and some of the changes has therefore not been carried out (notably
the signature change on Processor.Parameters.scheduler).

This is a continuation of elastic#28667, elastic#36137 and also fixes elastic#37708.
  • Loading branch information
henningandersen committed Feb 1, 2019
1 parent e30f6cc commit 09f661f
Show file tree
Hide file tree
Showing 53 changed files with 713 additions and 241 deletions.
3 changes: 3 additions & 0 deletions buildSrc/src/main/resources/forbidden/es-all-signatures.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,6 @@ java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.RejectedExecutionHandler)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)

@defaultMessage use Scheduler.schedule(Runnable, delay, executor) instead (mocking tests typically rely on that signature).
org.elasticsearch.threadpool.Scheduler#schedule(org.elasticsearch.common.unit.TimeValue, java.lang.String, java.lang.Runnable)
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void onFailure(Exception e) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e);
countSearchRetry.run();
threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this);
threadPool.schedule(RetryHelper.this, delay, ThreadPool.Names.SAME);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -320,7 +319,7 @@ public void testThreadPoolRejectionsAbortRequest() throws Exception {
worker.rethrottle(1);
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
// While we're here we can check that the sleep made it through
assertThat(delay.nanos(), greaterThan(0L));
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
Expand Down Expand Up @@ -439,7 +438,7 @@ public void testScrollDelay() throws Exception {
AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
capturedDelay.set(delay);
capturedCommand.set(command);
return null;
Expand Down Expand Up @@ -615,7 +614,7 @@ public void testCancelWhileDelayedAfterScrollResponse() throws Exception {
*/
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
/*
* This is called twice:
* 1. To schedule the throttling. When that happens we immediately cancel the task.
Expand All @@ -626,7 +625,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
if (delay.nanos() > 0) {
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
}
return super.schedule(delay, name, command);
return super.schedule(command, delay, name);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -104,7 +103,7 @@ public ExecutorService executor(String name) {
}

@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
command.run();
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@

package org.elasticsearch.threadpool;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.junit.After;
import org.junit.Before;

Expand All @@ -38,6 +44,7 @@
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;

Expand Down Expand Up @@ -108,7 +115,12 @@ public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws In
try {
checkExecutionError(getExecuteRunner(prioritizedExecutor));
checkExecutionError(getSubmitRunner(prioritizedExecutor));
// bias towards timeout
checkExecutionError(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r));
// race whether timeout or success (but typically biased towards success)
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r));
// bias towards no timeout.
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r));
} finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -170,10 +182,7 @@ public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedE
final boolean expectExceptionOnSchedule =
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE
// scheduler just swallows the exception here
// TODO: bubble these exceptions up
&& ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.DIRECT;
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule);
}
}
Expand Down Expand Up @@ -219,14 +228,19 @@ public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37708")
public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
try {
checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
checkExecutionException(getSubmitRunner(prioritizedExecutor), false);

// bias towards timeout
checkExecutionException(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r), true);
// race whether timeout or success (but typically biased towards success)
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true);
// bias towards no timeout.
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r), true);
} finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
}
Expand All @@ -235,26 +249,39 @@ public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throw
public void testExecutionExceptionOnScheduler() throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
try {
// scheduler just swallows the exceptions
// TODO: bubble these exceptions up
checkExecutionException(getExecuteRunner(scheduler), false);
checkExecutionException(getSubmitRunner(scheduler), false);
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), false);
checkExecutionException(getExecuteRunner(scheduler), true);
// while submit does return a Future, we choose to log exceptions anyway,
// since this is the semi-internal SafeScheduledThreadPoolExecutor that is being used,
// which also logs exceptions for schedule calls.
checkExecutionException(getSubmitRunner(scheduler), true);
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), true);
} finally {
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
}
}

private Runnable delayMillis(Runnable r, int ms) {
return () -> {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
r.run();
};
}

private void checkExecutionException(Consumer<Runnable> runner, boolean expectException) throws InterruptedException {
logger.info("checking exception for {}", runner);
final Runnable runnable;
final boolean willThrow;
if (randomBoolean()) {
logger.info("checking direct exception for {}", runner);
runnable = () -> {
throw new IllegalStateException("future exception");
};
willThrow = expectException;
} else {
logger.info("checking abstract runnable exception for {}", runner);
runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
Expand All @@ -275,6 +302,7 @@ protected void doRun() {
o -> {
assertEquals(willThrow, o.isPresent());
if (willThrow) {
if (o.get() instanceof Error) throw (Error) o.get();
assertThat(o.get(), instanceOf(IllegalStateException.class));
assertThat(o.get(), hasToString(containsString("future exception")));
}
Expand Down Expand Up @@ -313,7 +341,7 @@ Consumer<Runnable> getScheduleRunner(String executor) {
return new Consumer<Runnable>() {
@Override
public void accept(Runnable runnable) {
threadPool.schedule(randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor, runnable);
threadPool.schedule(runnable, randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor);
}

@Override
Expand All @@ -324,42 +352,77 @@ public String toString() {
}

private void runExecutionTest(
final Consumer<Runnable> runner,
final Runnable runnable,
final boolean expectThrowable,
final Consumer<Optional<Throwable>> consumer) throws InterruptedException {
final Consumer<Runnable> runner,
final Runnable runnable,
final boolean expectThrowable,
final Consumer<Optional<Throwable>> consumer) throws InterruptedException {
final AtomicReference<Throwable> throwableReference = new AtomicReference<>();
final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
final CountDownLatch uncaughtExceptionHandlerLatch = new CountDownLatch(1);

try {
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
assertTrue(expectThrowable);
throwableReference.set(e);
assertTrue("Only one message allowed", throwableReference.compareAndSet(null, e));
uncaughtExceptionHandlerLatch.countDown();
});

final CountDownLatch supplierLatch = new CountDownLatch(1);

try {
runner.accept(() -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
Runnable job = () -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
};

// snoop on logging to also handle the cases where exceptions are simply logged in Scheduler.
final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class);
final MockLogAppender appender = new MockLogAppender();
appender.addExpectation(
new MockLogAppender.LoggingExpectation() {
@Override
public void match(LogEvent event) {
if (event.getLevel() == Level.WARN) {
assertThat("no other warnings than those expected",
event.getMessage().getFormattedMessage(),
equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]"));
assertTrue(expectThrowable);
assertNotNull(event.getThrown());
assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown()));
uncaughtExceptionHandlerLatch.countDown();
}
}

@Override
public void assertMatched() {
}
});
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

supplierLatch.await();
appender.start();
Loggers.addAppender(schedulerLogger, appender);
try {
try {
runner.accept(job);
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

supplierLatch.await();

if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
}
} finally {
Loggers.removeAppender(schedulerLogger, appender);
appender.stop();
}

consumer.accept(Optional.ofNullable(throwableReference.get()));
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} finally {
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,15 @@ public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkRespons
Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
return new Builder(consumer, listener,
(delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
buildScheduler(scheduledThreadPoolExecutor),
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
}

private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
return (command, delay, executor) ->
Scheduler.wrapAsScheduledCancellable(scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
}

private final int bulkActions;
private final long bulkSize;

Expand Down Expand Up @@ -345,7 +350,9 @@ private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler
if (flushInterval == null) {
return new Scheduler.Cancellable() {
@Override
public void cancel() {}
public boolean cancel() {
return false;
}

@Override
public boolean isCancelled() {
Expand Down
14 changes: 8 additions & 6 deletions server/src/main/java/org/elasticsearch/action/bulk/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -121,7 +119,7 @@ static class RetryHandler implements ActionListener<BulkResponse> {
// needed to construct the next bulk request based on the response to the previous one
// volatile as we're called from a scheduled thread
private volatile BulkRequest currentBulkRequest;
private volatile ScheduledFuture<?> scheduledRequestFuture;
private volatile Scheduler.Cancellable retryCancellable;

RetryHandler(BackoffPolicy backoffPolicy, BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
ActionListener<BulkResponse> listener, Scheduler scheduler) {
Expand Down Expand Up @@ -155,7 +153,9 @@ public void onFailure(Exception e) {
try {
listener.onFailure(e);
} finally {
FutureUtils.cancel(scheduledRequestFuture);
if (retryCancellable != null) {
retryCancellable.cancel();
}
}
}

Expand All @@ -164,7 +164,7 @@ private void retry(BulkRequest bulkRequestForRetry) {
TimeValue next = backoff.next();
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
retryCancellable = scheduler.schedule(command, next, ThreadPool.Names.SAME);
}

private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
Expand Down Expand Up @@ -198,7 +198,9 @@ private void finishHim() {
try {
listener.onResponse(getAccumulatedResponse());
} finally {
FutureUtils.cancel(scheduledRequestFuture);
if (retryCancellable != null) {
retryCancellable.cancel();
}
}
}

Expand Down
Loading

0 comments on commit 09f661f

Please sign in to comment.