diff --git a/buildSrc/src/main/resources/forbidden/es-all-signatures.txt b/buildSrc/src/main/resources/forbidden/es-all-signatures.txt index 130984eb58f17..2ea46376ae3bf 100644 --- a/buildSrc/src/main/resources/forbidden/es-all-signatures.txt +++ b/buildSrc/src/main/resources/forbidden/es-all-signatures.txt @@ -50,3 +50,16 @@ java.nio.channels.SocketChannel#connect(java.net.SocketAddress) java.lang.Boolean#getBoolean(java.lang.String) org.apache.lucene.util.IOUtils @ use @org.elasticsearch.core.internal.io instead + +@defaultMessage use executors from org.elasticsearch.common.util.concurrent.EsExecutors instead which will properly bubble up Errors +java.util.concurrent.AbstractExecutorService#() +java.util.concurrent.ThreadPoolExecutor#(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue) +java.util.concurrent.ThreadPoolExecutor#(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.ThreadFactory) +java.util.concurrent.ThreadPoolExecutor#(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.RejectedExecutionHandler) +java.util.concurrent.ThreadPoolExecutor#(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler) + +@defaultMessage extend org.elasticsearch.threadpool.Scheduler.SafeScheduledThreadPoolExecutor instead which will properly bubble up Errors +java.util.concurrent.ScheduledThreadPoolExecutor#(int) +java.util.concurrent.ScheduledThreadPoolExecutor#(int, java.util.concurrent.ThreadFactory) +java.util.concurrent.ScheduledThreadPoolExecutor#(int, java.util.concurrent.RejectedExecutionHandler) +java.util.concurrent.ScheduledThreadPoolExecutor#(int, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler) diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java index da43927d1dfee..1754310c34f20 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java @@ -19,12 +19,21 @@ package org.elasticsearch.threadpool; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.test.ESTestCase; import org.junit.After; import org.junit.Before; import java.util.Optional; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -46,26 +55,275 @@ public void tearDownThreadPool() throws InterruptedException { terminate(threadPool); } - public void testExecutionException() throws InterruptedException { - runExecutionExceptionTest( - () -> { + public void testExecutionErrorOnDefaultThreadPoolTypes() throws InterruptedException { + for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) { + checkExecutionError(getExecuteRunner(threadPool.executor(executor))); + checkExecutionError(getSubmitRunner(threadPool.executor(executor))); + checkExecutionError(getScheduleRunner(executor)); + } + } + + public void testExecutionErrorOnDirectExecutorService() throws InterruptedException { + final ExecutorService directExecutorService = EsExecutors.newDirectExecutorService(); + checkExecutionError(getExecuteRunner(directExecutorService)); + checkExecutionError(getSubmitRunner(directExecutorService)); + } + + public void testExecutionErrorOnFixedESThreadPoolExecutor() throws InterruptedException { + final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1, + EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext()); + try { + checkExecutionError(getExecuteRunner(fixedExecutor)); + checkExecutionError(getSubmitRunner(fixedExecutor)); + } finally { + ThreadPool.terminate(fixedExecutor, 10, TimeUnit.SECONDS); + } + } + + public void testExecutionErrorOnScalingESThreadPoolExecutor() throws InterruptedException { + final EsThreadPoolExecutor scalingExecutor = EsExecutors.newScaling("test", 1, 1, + 10, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext()); + try { + checkExecutionError(getExecuteRunner(scalingExecutor)); + checkExecutionError(getSubmitRunner(scalingExecutor)); + } finally { + ThreadPool.terminate(scalingExecutor, 10, TimeUnit.SECONDS); + } + } + + public void testExecutionErrorOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException { + final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1, + 1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext()); + try { + checkExecutionError(getExecuteRunner(autoQueueFixedExecutor)); + checkExecutionError(getSubmitRunner(autoQueueFixedExecutor)); + } finally { + ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS); + } + } + + public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException { + final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test", + EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler()); + try { + checkExecutionError(getExecuteRunner(prioritizedExecutor)); + checkExecutionError(getSubmitRunner(prioritizedExecutor)); + checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r)); + } finally { + ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS); + } + } + + public void testExecutionErrorOnScheduler() throws InterruptedException { + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + try { + checkExecutionError(getExecuteRunner(scheduler)); + checkExecutionError(getSubmitRunner(scheduler)); + checkExecutionError(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS)); + } finally { + Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS); + } + } + + private void checkExecutionError(Consumer runner) throws InterruptedException { + logger.info("checking error for {}", runner); + final Runnable runnable; + if (randomBoolean()) { + runnable = () -> { + throw new Error("future error"); + }; + } else { + runnable = new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + + } + + @Override + protected void doRun() { throw new Error("future error"); - }, - true, - o -> { - assertTrue(o.isPresent()); - assertThat(o.get(), instanceOf(Error.class)); - assertThat(o.get(), hasToString(containsString("future error"))); - }); - runExecutionExceptionTest( - () -> { + } + }; + } + runExecutionTest( + runner, + runnable, + true, + o -> { + assertTrue(o.isPresent()); + assertThat(o.get(), instanceOf(Error.class)); + assertThat(o.get(), hasToString(containsString("future error"))); + }); + } + + public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException { + for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) { + final boolean expectExceptionOnExecute = + // fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable + // TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener + ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE; + checkExecutionException(getExecuteRunner(threadPool.executor(executor)), expectExceptionOnExecute); + + // here, it's ok for the exception not to bubble up. Accessing the future will yield the exception + checkExecutionException(getSubmitRunner(threadPool.executor(executor)), false); + + final boolean expectExceptionOnSchedule = + // fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable + // TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener + ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE + // scheduler just swallows the exception here + // TODO: bubble these exceptions up + && ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.DIRECT; + checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule); + } + } + + public void testExecutionExceptionOnDirectExecutorService() throws InterruptedException { + final ExecutorService directExecutorService = EsExecutors.newDirectExecutorService(); + checkExecutionException(getExecuteRunner(directExecutorService), true); + checkExecutionException(getSubmitRunner(directExecutorService), false); + } + + public void testExecutionExceptionOnFixedESThreadPoolExecutor() throws InterruptedException { + final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1, + EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext()); + try { + checkExecutionException(getExecuteRunner(fixedExecutor), true); + checkExecutionException(getSubmitRunner(fixedExecutor), false); + } finally { + ThreadPool.terminate(fixedExecutor, 10, TimeUnit.SECONDS); + } + } + + public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws InterruptedException { + final EsThreadPoolExecutor scalingExecutor = EsExecutors.newScaling("test", 1, 1, + 10, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext()); + try { + checkExecutionException(getExecuteRunner(scalingExecutor), true); + checkExecutionException(getSubmitRunner(scalingExecutor), false); + } finally { + ThreadPool.terminate(scalingExecutor, 10, TimeUnit.SECONDS); + } + } + + public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException { + final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1, + 1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext()); + try { + // fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable + // TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener + checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), false); + checkExecutionException(getSubmitRunner(autoQueueFixedExecutor), false); + } finally { + ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS); + } + } + + public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException { + final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test", + EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler()); + try { + checkExecutionException(getExecuteRunner(prioritizedExecutor), true); + checkExecutionException(getSubmitRunner(prioritizedExecutor), false); + checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true); + } finally { + ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS); + } + } + + public void testExecutionExceptionOnScheduler() throws InterruptedException { + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + try { + // scheduler just swallows the exceptions + // TODO: bubble these exceptions up + checkExecutionException(getExecuteRunner(scheduler), false); + checkExecutionException(getSubmitRunner(scheduler), false); + checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), false); + } finally { + Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS); + } + } + + private void checkExecutionException(Consumer runner, boolean expectException) throws InterruptedException { + logger.info("checking exception for {}", runner); + final Runnable runnable; + final boolean willThrow; + if (randomBoolean()) { + runnable = () -> { + throw new IllegalStateException("future exception"); + }; + willThrow = expectException; + } else { + runnable = new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + + } + + @Override + protected void doRun() { throw new IllegalStateException("future exception"); - }, - false, - o -> assertFalse(o.isPresent())); + } + }; + willThrow = false; + } + runExecutionTest( + runner, + runnable, + willThrow, + o -> { + assertEquals(willThrow, o.isPresent()); + if (willThrow) { + assertThat(o.get(), instanceOf(IllegalStateException.class)); + assertThat(o.get(), hasToString(containsString("future exception"))); + } + }); + } + + Consumer getExecuteRunner(ExecutorService executor) { + return new Consumer() { + @Override + public void accept(Runnable runnable) { + executor.execute(runnable); + } + + @Override + public String toString() { + return "executor(" + executor + ").execute()"; + } + }; + } + + Consumer getSubmitRunner(ExecutorService executor) { + return new Consumer() { + @Override + public void accept(Runnable runnable) { + executor.submit(runnable); + } + + @Override + public String toString() { + return "executor(" + executor + ").submit()"; + } + }; + } + + Consumer getScheduleRunner(String executor) { + return new Consumer() { + @Override + public void accept(Runnable runnable) { + threadPool.schedule(randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor, runnable); + } + + @Override + public String toString() { + return "schedule(" + executor + ")"; + } + }; } - private void runExecutionExceptionTest( + private void runExecutionTest( + final Consumer runner, final Runnable runnable, final boolean expectThrowable, final Consumer> consumer) throws InterruptedException { @@ -82,13 +340,18 @@ private void runExecutionExceptionTest( final CountDownLatch supplierLatch = new CountDownLatch(1); - threadPool.generic().submit(() -> { - try { - runnable.run(); - } finally { - supplierLatch.countDown(); - } - }); + try { + runner.accept(() -> { + try { + runnable.run(); + } finally { + supplierLatch.countDown(); + } + }); + } catch (Throwable t) { + consumer.accept(Optional.of(t)); + return; + } supplierLatch.await(); diff --git a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java index 923a76c0acb20..e0525127ee7e7 100644 --- a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -243,6 +243,13 @@ public static Optional maybeError(final Throwable cause, final Logger log return Optional.empty(); } + /** + * See {@link #maybeError(Throwable, Logger)}. Uses the class-local logger. + */ + public static Optional maybeError(final Throwable cause) { + return maybeError(cause, logger); + } + /** * If the specified cause is an unrecoverable error, this method will rethrow the cause on a separate thread so that it can not be * caught and bubbles up to the uncaught exception handler. Note that the cause tree is examined for any {@link Error}. See diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 44367053406e3..cb358a0596d25 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -19,6 +19,8 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -27,10 +29,14 @@ import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -108,7 +114,45 @@ public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int new EsAbortPolicy(), contextHolder); } - private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService() { + /** + * Checks if the runnable arose from asynchronous submission of a task to an executor. If an uncaught exception was thrown + * during the execution of this task, we need to inspect this runnable and see if it is an error that should be propagated + * to the uncaught exception handler. + */ + public static void rethrowErrors(Runnable runnable) { + if (runnable instanceof RunnableFuture) { + try { + ((RunnableFuture) runnable).get(); + } catch (final Exception e) { + /* + * In theory, Future#get can only throw a cancellation exception, an interrupted exception, or an execution + * exception. We want to ignore cancellation exceptions, restore the interrupt status on interrupted exceptions, and + * inspect the cause of an execution. We are going to be extra paranoid here though and completely unwrap the + * exception to ensure that there is not a buried error anywhere. We assume that a general exception has been + * handled by the executed task or the task submitter. + */ + assert e instanceof CancellationException + || e instanceof InterruptedException + || e instanceof ExecutionException : e; + final Optional maybeError = ExceptionsHelper.maybeError(e); + if (maybeError.isPresent()) { + // throw this error where it will propagate to the uncaught exception handler + throw maybeError.get(); + } + if (e instanceof InterruptedException) { + // restore the interrupt status + Thread.currentThread().interrupt(); + } + } + } + } + + private static final class DirectExecutorService extends AbstractExecutorService { + + @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") + DirectExecutorService() { + super(); + } @Override public void shutdown() { @@ -131,16 +175,18 @@ public boolean isTerminated() { } @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + public boolean awaitTermination(long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override public void execute(Runnable command) { command.run(); + rethrowErrors(command); } + } - }; + private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new DirectExecutorService(); /** * Returns an {@link ExecutorService} that executes submitted tasks on the current thread. This executor service does not support being diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 8bbf0a59ee06d..4bb82e5a01157 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -19,6 +19,8 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.common.SuppressForbidden; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -48,6 +50,7 @@ final String getName() { this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder); } + @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) { @@ -89,11 +92,8 @@ public interface ShutdownListener { } @Override - public void execute(final Runnable command) { - doExecute(wrapRunnable(command)); - } - - protected void doExecute(final Runnable command) { + public void execute(Runnable command) { + command = wrapRunnable(command); try { super.execute(command); } catch (EsRejectedExecutionException ex) { @@ -115,6 +115,7 @@ protected void doExecute(final Runnable command) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); + EsExecutors.rethrowErrors(unwrap(r)); assert assertDefaultContext(r); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index 8f9245ad58334..a462d36be3cc3 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -96,13 +96,13 @@ private void addPending(List runnables, List pending, boolean /** innerRunnable can be null if task is finished but not removed from executor yet, * see {@link TieBreakingPrioritizedRunnable#run} and {@link TieBreakingPrioritizedRunnable#runAndClean} */ - pending.add(new Pending(unwrap(innerRunnable), t.priority(), t.insertionOrder, executing)); + pending.add(new Pending(super.unwrap(innerRunnable), t.priority(), t.insertionOrder, executing)); } } else if (runnable instanceof PrioritizedFutureTask) { PrioritizedFutureTask t = (PrioritizedFutureTask) runnable; Object task = t.task; if (t.task instanceof Runnable) { - task = unwrap((Runnable) t.task); + task = super.unwrap((Runnable) t.task); } pending.add(new Pending(task, t.priority, t.insertionOrder, executing)); } @@ -122,7 +122,7 @@ protected void afterExecute(Runnable r, Throwable t) { public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) { command = wrapRunnable(command); - doExecute(command); + execute(command); if (timeout.nanos() >= 0) { if (command instanceof TieBreakingPrioritizedRunnable) { ((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout); @@ -149,6 +149,14 @@ protected Runnable wrapRunnable(Runnable command) { } } + @Override + protected Runnable unwrap(Runnable runnable) { + if (runnable instanceof WrappedRunnable) { + return super.unwrap(((WrappedRunnable) runnable).unwrap()); + } else { + return super.unwrap(runnable); + } + } @Override protected RunnableFuture newTaskFor(Runnable runnable, T value) { @@ -181,7 +189,7 @@ public Pending(Object task, Priority priority, long insertionOrder, boolean exec } } - private final class TieBreakingPrioritizedRunnable extends PrioritizedRunnable { + private final class TieBreakingPrioritizedRunnable extends PrioritizedRunnable implements WrappedRunnable { private Runnable runnable; private final long insertionOrder; @@ -246,11 +254,16 @@ private void runAndClean(Runnable run) { runnable = null; timeoutFuture = null; } + } + @Override + public Runnable unwrap() { + return runnable; } + } - private final class PrioritizedFutureTask extends FutureTask implements Comparable { + private static final class PrioritizedFutureTask extends FutureTask implements Comparable { final Object task; final Priority priority; diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java index 7ef2e96e2c5a3..7f0b4ac1a13ff 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java @@ -29,7 +29,7 @@ public abstract class PrioritizedRunnable implements Runnable, Comparable runnableWrapper; + private final Function runnableWrapper; private final ResizableBlockingQueue workQueue; private final int tasksPerFrame; private final int minQueueSize; @@ -60,7 +60,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ResizableBlockingQueue workQueue, int minQueueSize, int maxQueueSize, - Function runnableWrapper, final int tasksPerFrame, + Function runnableWrapper, final int tasksPerFrame, TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, @@ -78,12 +78,18 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto } @Override - protected void doExecute(final Runnable command) { - // we are submitting a task, it has not yet started running (because super.excute() has not - // been called), but it could be immediately run, or run at a later time. We need the time - // this task entered the queue, which we get by creating a TimedRunnable, which starts the - // clock as soon as it is created. - super.doExecute(this.runnableWrapper.apply(command)); + protected Runnable wrapRunnable(Runnable command) { + return super.wrapRunnable(this.runnableWrapper.apply(command)); + } + + @Override + protected Runnable unwrap(Runnable runnable) { + final Runnable unwrapped = super.unwrap(runnable); + if (unwrapped instanceof WrappedRunnable) { + return ((WrappedRunnable) unwrapped).unwrap(); + } else { + return unwrapped; + } } /** @@ -146,11 +152,12 @@ protected void afterExecute(Runnable r, Throwable t) { // total time as a combination of the time in the queue and time spent running the task. We // only want runnables that did not throw errors though, because they could be fast-failures // that throw off our timings, so only check when t is null. - assert r instanceof TimedRunnable : "expected only TimedRunnables in queue"; - final long taskNanos = ((TimedRunnable) r).getTotalNanos(); + assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue"; + final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r); + final long taskNanos = timedRunnable.getTotalNanos(); final long totalNanos = totalTaskNanos.addAndGet(taskNanos); - final long taskExecutionNanos = ((TimedRunnable) r).getTotalExecutionNanos(); + final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos(); assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos; executionEWMA.addValue(taskExecutionNanos); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index 79d7c3510c2d1..2c1011d1d9e53 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -18,10 +18,9 @@ */ package org.elasticsearch.common.util.concurrent; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.CloseableThreadLocal; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.io.stream.StreamInput; @@ -32,27 +31,23 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.http.HttpTransportSettings; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE; - import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.RunnableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.nio.charset.StandardCharsets; + +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE; /** @@ -352,11 +347,8 @@ public Runnable preserveContext(Runnable command) { * Unwraps a command that was previously wrapped by {@link #preserveContext(Runnable)}. */ public Runnable unwrap(Runnable command) { - if (command instanceof ContextPreservingAbstractRunnable) { - return ((ContextPreservingAbstractRunnable) command).unwrap(); - } - if (command instanceof ContextPreservingRunnable) { - return ((ContextPreservingRunnable) command).unwrap(); + if (command instanceof WrappedRunnable) { + return ((WrappedRunnable) command).unwrap(); } return command; } @@ -642,7 +634,7 @@ public void close() { /** * Wraps a Runnable to preserve the thread context. */ - private class ContextPreservingRunnable implements Runnable { + private class ContextPreservingRunnable implements WrappedRunnable { private final Runnable in; private final ThreadContext.StoredContext ctx; @@ -658,36 +650,6 @@ public void run() { ctx.restore(); whileRunning = true; in.run(); - if (in instanceof RunnableFuture) { - /* - * The wrapped runnable arose from asynchronous submission of a task to an executor. If an uncaught exception was thrown - * during the execution of this task, we need to inspect this runnable and see if it is an error that should be - * propagated to the uncaught exception handler. - */ - try { - ((RunnableFuture) in).get(); - } catch (final Exception e) { - /* - * In theory, Future#get can only throw a cancellation exception, an interrupted exception, or an execution - * exception. We want to ignore cancellation exceptions, restore the interrupt status on interrupted exceptions, and - * inspect the cause of an execution. We are going to be extra paranoid here though and completely unwrap the - * exception to ensure that there is not a buried error anywhere. We assume that a general exception has been - * handled by the executed task or the task submitter. - */ - assert e instanceof CancellationException - || e instanceof InterruptedException - || e instanceof ExecutionException : e; - final Optional maybeError = ExceptionsHelper.maybeError(e, logger); - if (maybeError.isPresent()) { - // throw this error where it will propagate to the uncaught exception handler - throw maybeError.get(); - } - if (e instanceof InterruptedException) { - // restore the interrupt status - Thread.currentThread().interrupt(); - } - } - } whileRunning = false; } catch (IllegalStateException ex) { if (whileRunning || threadLocal.closed.get() == false) { @@ -704,6 +666,7 @@ public String toString() { return in.toString(); } + @Override public Runnable unwrap() { return in; } @@ -712,7 +675,7 @@ public Runnable unwrap() { /** * Wraps an AbstractRunnable to preserve the thread context. */ - private class ContextPreservingAbstractRunnable extends AbstractRunnable { + private class ContextPreservingAbstractRunnable extends AbstractRunnable implements WrappedRunnable { private final AbstractRunnable in; private final ThreadContext.StoredContext creatorsContext; @@ -773,6 +736,7 @@ public String toString() { return in.toString(); } + @Override public AbstractRunnable unwrap() { return in; } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java index ad5519c0a76df..6d01f5e5cc255 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java @@ -23,7 +23,7 @@ * A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation * through execution as well as only execution time. */ -class TimedRunnable extends AbstractRunnable { +class TimedRunnable extends AbstractRunnable implements WrappedRunnable { private final Runnable original; private final long creationTimeNanos; private long startTimeNanos; @@ -94,4 +94,9 @@ long getTotalExecutionNanos() { return Math.max(finishTimeNanos - startTimeNanos, 1); } + @Override + public Runnable unwrap() { + return original; + } + } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/WrappedRunnable.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/WrappedRunnable.java new file mode 100644 index 0000000000000..37519968253d6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/WrappedRunnable.java @@ -0,0 +1,23 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +public interface WrappedRunnable extends Runnable { + Runnable unwrap(); +} diff --git a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java index 2901fc1f7a8ed..1b7c74ed6eec4 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java +++ b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java @@ -19,6 +19,7 @@ package org.elasticsearch.threadpool; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -26,8 +27,10 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -37,7 +40,7 @@ public interface Scheduler { static ScheduledThreadPoolExecutor initScheduler(Settings settings) { - ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, + final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); @@ -206,4 +209,30 @@ public void onAfter() { } } } + + /** + * This subclass ensures to properly bubble up Throwable instances of type Error. + */ + class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + + @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") + public SafeScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { + super(corePoolSize, threadFactory, handler); + } + + @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") + public SafeScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + } + + @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") + public SafeScheduledThreadPoolExecutor(int corePoolSize) { + super(corePoolSize); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + EsExecutors.rethrowErrors(r); + } + } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java index 3b94c8c960a81..8b162082fcdca 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java @@ -227,19 +227,13 @@ public void testExecutionEWMACalculation() throws Exception { context.close(); } - private Function randomBetweenLimitsWrapper(final int minNs, final int maxNs) { - return (runnable) -> { - return new SettableTimedRunnable(randomIntBetween(minNs, maxNs)); - }; - } - - private Function fastWrapper() { + private Function fastWrapper() { return (runnable) -> { return new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100)); }; } - private Function slowWrapper() { + private Function slowWrapper() { return (runnable) -> { return new SettableTimedRunnable(TimeUnit.MINUTES.toNanos(2)); }; diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index fa0e0cee1435f..59c3553d25fd2 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.junit.After; import org.mockito.ArgumentCaptor; @@ -68,7 +69,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { private final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); private final ScheduledThreadPoolExecutor scheduler = - new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(Settings.EMPTY, "scheduler")); + new Scheduler.SafeScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(Settings.EMPTY, "scheduler")); @After public void shutdownScheduler() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 40bc878cd60ab..0aa47213d6608 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Setting; @@ -17,6 +19,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -748,7 +751,8 @@ private void removeTmpStorage(String jobId) throws IOException { } ExecutorService createAutodetectExecutorService(ExecutorService executorService) { - AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext()); + AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(logger, + threadPool.getThreadContext()); executorService.submit(autoDetectWorkerExecutor::start); return autoDetectWorkerExecutor; } @@ -758,15 +762,18 @@ ExecutorService createAutodetectExecutorService(ExecutorService executorService) * operations are initially added to a queue and a worker thread from ml autodetect threadpool will process each * operation at a time. */ - class AutodetectWorkerExecutorService extends AbstractExecutorService { + static class AutodetectWorkerExecutorService extends AbstractExecutorService { + private final Logger logger; private final ThreadContext contextHolder; private final CountDownLatch awaitTermination = new CountDownLatch(1); private final BlockingQueue queue = new LinkedBlockingQueue<>(100); private volatile boolean running = true; - AutodetectWorkerExecutorService(ThreadContext contextHolder) { + @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") + AutodetectWorkerExecutorService(Logger logger, ThreadContext contextHolder) { + this.logger = logger; this.contextHolder = contextHolder; } @@ -813,6 +820,7 @@ void start() { } catch (Exception e) { logger.error("error handling job operation", e); } + EsExecutors.rethrowErrors(contextHolder.unwrap(runnable)); } } } catch (InterruptedException e) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureFinderManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureFinderManagerTests.java index 246c96011c2bf..978f1c5286de8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureFinderManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureFinderManagerTests.java @@ -8,6 +8,7 @@ import com.ibm.icu.text.CharsetMatch; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.xpack.core.ml.filestructurefinder.FileStructure; import org.junit.After; import org.junit.Before; @@ -21,7 +22,6 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.ml.filestructurefinder.FileStructureOverrides.EMPTY_OVERRIDES; @@ -36,7 +36,7 @@ public class FileStructureFinderManagerTests extends FileStructureTestCase { @Before public void setup() { - scheduler = new ScheduledThreadPoolExecutor(1); + scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1); structureFinderManager = new FileStructureFinderManager(scheduler); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java index ea581f663462f..2770656279cff 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java @@ -8,11 +8,11 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; +import org.elasticsearch.threadpool.Scheduler; import org.junit.After; import org.junit.Before; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; public class TimeoutCheckerTests extends FileStructureTestCase { @@ -20,7 +20,7 @@ public class TimeoutCheckerTests extends FileStructureTestCase { @Before public void createScheduler() { - scheduler = new ScheduledThreadPoolExecutor(1); + scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1); } @After diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 32f6d2fa88311..916393ebba0ac 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -41,6 +42,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.AutodetectWorkerExecutorService; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; @@ -78,6 +80,7 @@ import static org.elasticsearch.mock.orig.Mockito.verify; import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; import static org.elasticsearch.mock.orig.Mockito.when; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -669,6 +672,26 @@ public void testCreate_givenNonZeroCountsAndNoModelSnapshotNorQuantiles() { verifyNoMoreInteractions(auditor); } + public void testAutodetectWorkerExecutorServiceDoesNotSwallowErrors() { + final ThreadPool threadPool = new TestThreadPool("testAutodetectWorkerExecutorServiceDoesNotSwallowErrors"); + try { + final AutodetectWorkerExecutorService executor = new AutodetectWorkerExecutorService(logger, threadPool.getThreadContext()); + if (randomBoolean()) { + executor.submit(() -> { + throw new Error("future error"); + }); + } else { + executor.execute(() -> { + throw new Error("future error"); + }); + } + final Error e = expectThrows(Error.class, () -> executor.start()); + assertThat(e.getMessage(), containsString("future error")); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + private AutodetectProcessManager createNonSpyManager(String jobId) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 06867d9c8d83d..e524cd1feb390 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; @@ -83,7 +84,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { @Before public void setUpMocks() { - executor = new ScheduledThreadPoolExecutor(1); + executor = new Scheduler.SafeScheduledThreadPoolExecutor(1); client = mock(Client.class); threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool);