Skip to content

Commit

Permalink
Bubble-up exceptions from scheduler (#38441)
Browse files Browse the repository at this point in the history
Instead of logging warnings we now rethrow exceptions thrown inside
scheduled/submitted tasks. This will still log them as warnings in
production but has the added benefit that if they are thrown during
unit/integration test runs, the test will be flagged as an error.

Fixed NPE in GlobalCheckPointListeners that caused CCR tests
(IndexFollowingIT and likely others) to fail.

This is a continuation of #38014

Backports #38317
  • Loading branch information
henningandersen authored Feb 5, 2019
1 parent 401c45c commit d0ac828
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,29 @@

package org.elasticsearch.threadpool;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.junit.After;
import org.junit.Before;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;

Expand Down Expand Up @@ -260,6 +257,50 @@ public void testExecutionExceptionOnScheduler() throws InterruptedException {
}
}

@SuppressForbidden(reason = "this tests that the deprecated method still works")
public void testDeprecatedSchedule() throws ExecutionException, InterruptedException {
verifyDeprecatedSchedule(((threadPool, runnable)
-> threadPool.schedule(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable)));
}

public void testThreadPoolScheduleDeprecated() throws ExecutionException, InterruptedException {
verifyDeprecatedSchedule(((threadPool, runnable)
-> threadPool.scheduleDeprecated(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable)));
}

private void verifyDeprecatedSchedule(BiFunction<ThreadPool,
Runnable, ScheduledFuture<?>> scheduleFunction) throws InterruptedException, ExecutionException {
Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler();
CountDownLatch missingExceptions = new CountDownLatch(1);
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
missingExceptions.countDown();
});
try {
ThreadPool threadPool = new TestThreadPool("test");
CountDownLatch missingExecutions = new CountDownLatch(1);
try {
scheduleFunction.apply(threadPool, missingExecutions::countDown)
.get();
assertEquals(0, missingExecutions.getCount());

ExecutionException exception = expectThrows(ExecutionException.class,
"schedule(...).get() must throw exception from runnable",
() -> scheduleFunction.apply(threadPool,
() -> {
throw new IllegalArgumentException("FAIL");
}
).get());

assertEquals(IllegalArgumentException.class, exception.getCause().getClass());
assertTrue(missingExceptions.await(30, TimeUnit.SECONDS));
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
} finally {
Thread.setDefaultUncaughtExceptionHandler(originalHandler);
}
}

private Runnable delayMillis(Runnable r, int ms) {
return () -> {
try {
Expand Down Expand Up @@ -369,60 +410,26 @@ private void runExecutionTest(

final CountDownLatch supplierLatch = new CountDownLatch(1);

Runnable job = () -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
};

// snoop on logging to also handle the cases where exceptions are simply logged in Scheduler.
final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class);
final MockLogAppender appender = new MockLogAppender();
appender.addExpectation(
new MockLogAppender.LoggingExpectation() {
@Override
public void match(LogEvent event) {
if (event.getLevel() == Level.WARN) {
assertThat("no other warnings than those expected",
event.getMessage().getFormattedMessage(),
equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]"));
assertTrue(expectThrowable);
assertNotNull(event.getThrown());
assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown()));
uncaughtExceptionHandlerLatch.countDown();
}
}

@Override
public void assertMatched() {
try {
runner.accept(() -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
});
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

appender.start();
Loggers.addAppender(schedulerLogger, appender);
try {
try {
runner.accept(job);
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

supplierLatch.await();
supplierLatch.await();

if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
}
} finally {
Loggers.removeAppender(schedulerLogger, appender);
appender.stop();
if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
}

consumer.accept(Optional.ofNullable(throwableReference.get()));
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} finally {
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpo
* before we could be cancelled by the notification. In this case, our listener here would
* not be in the map and we should not fire the timeout logic.
*/
removed = listeners.remove(listener).v2() != null;
removed = listeners.remove(listener) != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
Expand Down
19 changes: 9 additions & 10 deletions server/src/main/java/org/elasticsearch/threadpool/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -47,8 +47,8 @@ public interface Scheduler {
/**
* Create a scheduler that can be used client side. Server side, please use <code>ThreadPool.schedule</code> instead.
*
* Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started
* using execute, submit and schedule.
* Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will
* be logged as a warning. This includes jobs started using execute, submit and schedule.
* @param settings the settings to use
* @return executor
*/
Expand Down Expand Up @@ -272,7 +272,8 @@ public void onAfter() {
}

/**
* This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks
* This subclass ensures to properly bubble up Throwable instances of both type Error and Exception thrown in submitted/scheduled
* tasks to the uncaught exception handler
*/
class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class);
Expand All @@ -294,12 +295,10 @@ public SafeScheduledThreadPoolExecutor(int corePoolSize) {

@Override
protected void afterExecute(Runnable r, Throwable t) {
Throwable exception = EsExecutors.rethrowErrors(r);
if (exception != null) {
logger.warn(() ->
new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()),
exception);
}
if (t != null) return;
// Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we
// accept the wrapped exception in the output.
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.Counter;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -497,7 +498,16 @@ class ThreadedRunnable implements Runnable {

@Override
public void run() {
executor.execute(runnable);
try {
executor.execute(runnable);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug(new ParameterizedMessage("could not schedule execution of [{}] on [{}] as executor is shut down",
runnable, executor), e);
} else {
throw e;
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.threadpool;

import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -29,12 +28,9 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -157,37 +153,4 @@ public void testScheduledOnScheduler() throws InterruptedException {
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
}
}

@SuppressForbidden(reason = "this tests that the deprecated method still works")
public void testDeprecatedSchedule() throws ExecutionException, InterruptedException {
verifyDeprecatedSchedule(((threadPool, runnable)
-> threadPool.schedule(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable)));
}

public void testThreadPoolScheduleDeprecated() throws ExecutionException, InterruptedException {
verifyDeprecatedSchedule(((threadPool, runnable)
-> threadPool.scheduleDeprecated(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable)));
}

private void verifyDeprecatedSchedule(BiFunction<ThreadPool,
Runnable, ScheduledFuture<?>> scheduleFunction) throws InterruptedException, ExecutionException {
ThreadPool threadPool = new TestThreadPool("test");
CountDownLatch missingExecutions = new CountDownLatch(1);
try {
scheduleFunction.apply(threadPool, missingExecutions::countDown)
.get();
assertEquals(0, missingExecutions.getCount());

ExecutionException exception = expectThrows(ExecutionException.class,
"schedule(...).get() must throw exception from runnable",
() -> scheduleFunction.apply(threadPool,
() -> { throw new IllegalArgumentException("FAIL"); }
).get());

assertEquals(IllegalArgumentException.class, exception.getCause().getClass());
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

}

0 comments on commit d0ac828

Please sign in to comment.