From eac37ad92cc2a516db44b214aa10245b97c39d29 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Wed, 30 Jan 2019 14:05:07 +0100 Subject: [PATCH] Log warnings on exceptions in Scheduler Fixed review comments: removed todo, use FutureUtils.cancel and removed scheduler task decoration since this adds more complexity than it benefits. This is a continuation of #28667, #36137 and also fixes #37708. --- .../threadpool/EvilThreadPoolTests.java | 27 +++------ .../org/elasticsearch/ingest/Processor.java | 1 - .../threadpool/CancellableAdapter.java | 5 +- .../elasticsearch/threadpool/Scheduler.java | 58 ++----------------- .../transport/TransportService.java | 2 +- .../threadpool/SchedulerTests.java | 15 +++-- 6 files changed, 24 insertions(+), 84 deletions(-) diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java index d87259ef35629..02ba33f19c4f8 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java @@ -42,12 +42,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Function; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.startsWith; public class EvilThreadPoolTests extends ESTestCase { @@ -166,8 +165,7 @@ protected void doRun() { assertTrue(o.isPresent()); assertThat(o.get(), instanceOf(Error.class)); assertThat(o.get(), hasToString(containsString("future error"))); - }, - Object::toString); + }); } public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException { @@ -237,13 +235,12 @@ public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throw checkExecutionException(getExecuteRunner(prioritizedExecutor), true); checkExecutionException(getSubmitRunner(prioritizedExecutor), false); - Function logMessageFunction = r -> PrioritizedEsThreadPoolExecutor.class.getName(); // bias towards timeout - checkExecutionException(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r), true, logMessageFunction); + checkExecutionException(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r), true); // race whether timeout or success (but typically biased towards success) - checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true, logMessageFunction); + checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true); // bias towards no timeout. - checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r), true, logMessageFunction); + checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r), true); } finally { ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS); } @@ -275,11 +272,6 @@ private Runnable delayMillis(Runnable r, int ms) { } private void checkExecutionException(Consumer runner, boolean expectException) throws InterruptedException { - checkExecutionException(runner, expectException, Object::toString); - } - private void checkExecutionException(Consumer runner, - boolean expectException, - Function logMessageFunction) throws InterruptedException { final Runnable runnable; final boolean willThrow; if (randomBoolean()) { @@ -314,8 +306,7 @@ protected void doRun() { assertThat(o.get(), instanceOf(IllegalStateException.class)); assertThat(o.get(), hasToString(containsString("future exception"))); } - }, - logMessageFunction); + }); } Consumer getExecuteRunner(ExecutorService executor) { @@ -364,8 +355,7 @@ private void runExecutionTest( final Consumer runner, final Runnable runnable, final boolean expectThrowable, - final Consumer> consumer, - final Function logMessageFunction) throws InterruptedException { + final Consumer> consumer) throws InterruptedException { final AtomicReference throwableReference = new AtomicReference<>(); final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler(); final CountDownLatch uncaughtExceptionHandlerLatch = new CountDownLatch(1); @@ -398,8 +388,9 @@ public void match(LogEvent event) { if (event.getLevel() == Level.WARN) { assertThat("no other warnings than those expected", event.getMessage().getFormattedMessage(), - startsWith("failed to schedule " + logMessageFunction.apply(job))); + equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]")); assertTrue(expectThrowable); + assertNotNull(event.getThrown()); assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown())); uncaughtExceptionHandlerLatch.countDown(); } diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 32d802b4790f1..92b08bba77bf7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -105,7 +105,6 @@ class Parameters { /** * Provides scheduler support */ - // todo: should we promote ScheduledCancellable to somewhere more appropriate for this external use? public final BiFunction scheduler; public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, diff --git a/server/src/main/java/org/elasticsearch/threadpool/CancellableAdapter.java b/server/src/main/java/org/elasticsearch/threadpool/CancellableAdapter.java index 2fff2618534aa..9b5f658c6243e 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/CancellableAdapter.java +++ b/server/src/main/java/org/elasticsearch/threadpool/CancellableAdapter.java @@ -19,7 +19,7 @@ package org.elasticsearch.threadpool; -import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.util.concurrent.FutureUtils; import java.util.concurrent.Future; @@ -32,9 +32,8 @@ class CancellableAdapter implements Scheduler.Cancellable { } @Override - @SuppressForbidden(reason="enabler for Cancellable.cancel, never interrupts") public boolean cancel() { - return future.cancel(false); + return FutureUtils.cancel(future); } @Override diff --git a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java index 7ba1c179c49bc..4c1ad6a3715c6 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java +++ b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java @@ -32,9 +32,7 @@ import java.util.concurrent.Delayed; import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.RunnableScheduledFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -209,7 +207,7 @@ final class ReschedulingRunnable extends AbstractRunnable implements Cancellable @Override public boolean cancel() { - boolean result = run; + final boolean result = run; run = false; return result; } @@ -252,7 +250,7 @@ public void onAfter() { } /** - * This subclass ensures to properly bubble up Throwable instances of type Error. + * This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks */ class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class); @@ -272,59 +270,13 @@ public SafeScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize); } - /** - * Decorate task with better toString. - */ - @Override - protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { - return new ToStringRunnableScheduledFuture<>(task, runnable); - } - @Override protected void afterExecute(Runnable r, Throwable t) { Throwable exception = EsExecutors.rethrowErrors(r); if (exception != null) { - logger.warn(() -> new ParameterizedMessage("failed to schedule {}", r.toString()), exception); - } - } - - private class ToStringRunnableScheduledFuture extends FutureTask implements RunnableScheduledFuture { - private final RunnableScheduledFuture task; - private final Runnable runnable; - - private ToStringRunnableScheduledFuture(RunnableScheduledFuture task, Runnable runnable) { - super(runnable, null); - this.task = task; - this.runnable = runnable; - } - - @Override - public boolean isPeriodic() { - return task.isPeriodic(); - } - - @Override - public long getDelay(TimeUnit unit) { - return task.getDelay(unit); - } - - @Override - public int compareTo(Delayed o) { - return -o.compareTo(task); - } - - @SuppressForbidden(reason = "delegation, decorating a better toString, clients should preferably use Cancellable.cancel") - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean cancelled = super.cancel(mayInterruptIfRunning); - if (cancelled) - remove(this); - return cancelled; - } - - @Override - public String toString() { - return runnable.toString() + ": " + task.toString(); + logger.warn(() -> + new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()), + exception); } } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 94f170253ab51..531354b068c03 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1034,7 +1034,7 @@ public String toString() { return "timeout handler for [" + requestId + "][" + action + "]"; } - public void scheduleTimeout(TimeValue timeout) { + private void scheduleTimeout(TimeValue timeout) { this.cancellable = threadPool.schedule(this, timeout, ThreadPool.Names.GENERIC); } } diff --git a/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java b/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java index 3f82f1025cc7e..186f9e86b1e76 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java @@ -44,7 +44,7 @@ public void testCancelOnThreadPool() { scheduleAndCancel(threadPool, executed, type)); assertEquals(0, executed.get()); } finally { - threadPool.shutdownNow(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } } @@ -54,7 +54,7 @@ private void scheduleAndCancel(ThreadPool threadPool, AtomicLong executed, Strin assertFalse(scheduled.isCancelled()); assertTrue(scheduled.cancel()); assertTrue(scheduled.isCancelled()); - assertEquals("Cancel must auto-remove",0, schedulerQueueSize(threadPool)); + assertEquals("Cancel must auto-remove", 0, schedulerQueueSize(threadPool)); } private int schedulerQueueSize(ThreadPool threadPool) { @@ -74,10 +74,10 @@ public void testCancelOnScheduler() { assertFalse(scheduled.isCancelled()); assertTrue(scheduled.cancel()); assertTrue(scheduled.isCancelled()); - assertEquals("Cancel must auto-remove",0, executor.getQueue().size()); + assertEquals("Cancel must auto-remove", 0, executor.getQueue().size()); assertEquals(0, executed.get()); } finally { - executor.shutdownNow(); + Scheduler.terminate(executor, 10, TimeUnit.SECONDS); } } @@ -99,9 +99,8 @@ public void testDelay() throws InterruptedException { assertThat(laterDelays, Matchers.contains(initialDelays.stream().map(Matchers::lessThan).collect(Collectors.toList()))); - } finally { - threadPool.shutdownNow(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } } @@ -136,7 +135,7 @@ public void testScheduledOnThreadPool() throws InterruptedException { assertTrue(missingExecutions.await(30, TimeUnit.SECONDS)); } finally { - threadPool.shutdownNow(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } } @@ -151,7 +150,7 @@ public void testScheduledOnScheduler() throws InterruptedException { scheduler.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), ThreadPool.Names.SAME); assertTrue(missingExecutions.await(30, TimeUnit.SECONDS)); } finally { - executor.shutdownNow(); + Scheduler.terminate(executor, 10, TimeUnit.SECONDS); } } }