Skip to content

Commit

Permalink
Log warnings on exceptions in Scheduler
Browse files Browse the repository at this point in the history
Fixed review comments: removed todo, use FutureUtils.cancel and removed
scheduler task decoration since this adds more complexity than it
benefits.

This is a continuation of elastic#28667, elastic#36137 and also fixes elastic#37708.
  • Loading branch information
henningandersen committed Jan 30, 2019
1 parent bf7259c commit eac37ad
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;

public class EvilThreadPoolTests extends ESTestCase {

Expand Down Expand Up @@ -166,8 +165,7 @@ protected void doRun() {
assertTrue(o.isPresent());
assertThat(o.get(), instanceOf(Error.class));
assertThat(o.get(), hasToString(containsString("future error")));
},
Object::toString);
});
}

public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException {
Expand Down Expand Up @@ -237,13 +235,12 @@ public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throw
checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
checkExecutionException(getSubmitRunner(prioritizedExecutor), false);

Function<Runnable, String> logMessageFunction = r -> PrioritizedEsThreadPoolExecutor.class.getName();
// bias towards timeout
checkExecutionException(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r), true, logMessageFunction);
checkExecutionException(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r), true);
// race whether timeout or success (but typically biased towards success)
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true, logMessageFunction);
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true);
// bias towards no timeout.
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r), true, logMessageFunction);
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r), true);
} finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -275,11 +272,6 @@ private Runnable delayMillis(Runnable r, int ms) {
}

private void checkExecutionException(Consumer<Runnable> runner, boolean expectException) throws InterruptedException {
checkExecutionException(runner, expectException, Object::toString);
}
private void checkExecutionException(Consumer<Runnable> runner,
boolean expectException,
Function<Runnable, String> logMessageFunction) throws InterruptedException {
final Runnable runnable;
final boolean willThrow;
if (randomBoolean()) {
Expand Down Expand Up @@ -314,8 +306,7 @@ protected void doRun() {
assertThat(o.get(), instanceOf(IllegalStateException.class));
assertThat(o.get(), hasToString(containsString("future exception")));
}
},
logMessageFunction);
});
}

Consumer<Runnable> getExecuteRunner(ExecutorService executor) {
Expand Down Expand Up @@ -364,8 +355,7 @@ private void runExecutionTest(
final Consumer<Runnable> runner,
final Runnable runnable,
final boolean expectThrowable,
final Consumer<Optional<Throwable>> consumer,
final Function<Runnable, String> logMessageFunction) throws InterruptedException {
final Consumer<Optional<Throwable>> consumer) throws InterruptedException {
final AtomicReference<Throwable> throwableReference = new AtomicReference<>();
final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
final CountDownLatch uncaughtExceptionHandlerLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -398,8 +388,9 @@ public void match(LogEvent event) {
if (event.getLevel() == Level.WARN) {
assertThat("no other warnings than those expected",
event.getMessage().getFormattedMessage(),
startsWith("failed to schedule " + logMessageFunction.apply(job)));
equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]"));
assertTrue(expectThrowable);
assertNotNull(event.getThrown());
assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown()));
uncaughtExceptionHandlerLatch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ class Parameters {
/**
* Provides scheduler support
*/
// todo: should we promote ScheduledCancellable to somewhere more appropriate for this external use?
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;

public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.elasticsearch.threadpool;

import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.util.concurrent.FutureUtils;

import java.util.concurrent.Future;

Expand All @@ -32,9 +32,8 @@ class CancellableAdapter implements Scheduler.Cancellable {
}

@Override
@SuppressForbidden(reason="enabler for Cancellable.cancel, never interrupts")
public boolean cancel() {
return future.cancel(false);
return FutureUtils.cancel(future);
}

@Override
Expand Down
58 changes: 5 additions & 53 deletions server/src/main/java/org/elasticsearch/threadpool/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@

import java.util.concurrent.Delayed;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -209,7 +207,7 @@ final class ReschedulingRunnable extends AbstractRunnable implements Cancellable

@Override
public boolean cancel() {
boolean result = run;
final boolean result = run;
run = false;
return result;
}
Expand Down Expand Up @@ -252,7 +250,7 @@ public void onAfter() {
}

/**
* This subclass ensures to properly bubble up Throwable instances of type Error.
* This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks
*/
class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class);
Expand All @@ -272,59 +270,13 @@ public SafeScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}

/**
* Decorate task with better toString.
*/
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return new ToStringRunnableScheduledFuture<>(task, runnable);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
Throwable exception = EsExecutors.rethrowErrors(r);
if (exception != null) {
logger.warn(() -> new ParameterizedMessage("failed to schedule {}", r.toString()), exception);
}
}

private class ToStringRunnableScheduledFuture<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final RunnableScheduledFuture<V> task;
private final Runnable runnable;

private ToStringRunnableScheduledFuture(RunnableScheduledFuture<V> task, Runnable runnable) {
super(runnable, null);
this.task = task;
this.runnable = runnable;
}

@Override
public boolean isPeriodic() {
return task.isPeriodic();
}

@Override
public long getDelay(TimeUnit unit) {
return task.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
return -o.compareTo(task);
}

@SuppressForbidden(reason = "delegation, decorating a better toString, clients should preferably use Cancellable.cancel")
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled)
remove(this);
return cancelled;
}

@Override
public String toString() {
return runnable.toString() + ": " + task.toString();
logger.warn(() ->
new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()),
exception);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ public String toString() {
return "timeout handler for [" + requestId + "][" + action + "]";
}

public void scheduleTimeout(TimeValue timeout) {
private void scheduleTimeout(TimeValue timeout) {
this.cancellable = threadPool.schedule(this, timeout, ThreadPool.Names.GENERIC);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testCancelOnThreadPool() {
scheduleAndCancel(threadPool, executed, type));
assertEquals(0, executed.get());
} finally {
threadPool.shutdownNow();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand All @@ -54,7 +54,7 @@ private void scheduleAndCancel(ThreadPool threadPool, AtomicLong executed, Strin
assertFalse(scheduled.isCancelled());
assertTrue(scheduled.cancel());
assertTrue(scheduled.isCancelled());
assertEquals("Cancel must auto-remove",0, schedulerQueueSize(threadPool));
assertEquals("Cancel must auto-remove", 0, schedulerQueueSize(threadPool));
}

private int schedulerQueueSize(ThreadPool threadPool) {
Expand All @@ -74,10 +74,10 @@ public void testCancelOnScheduler() {
assertFalse(scheduled.isCancelled());
assertTrue(scheduled.cancel());
assertTrue(scheduled.isCancelled());
assertEquals("Cancel must auto-remove",0, executor.getQueue().size());
assertEquals("Cancel must auto-remove", 0, executor.getQueue().size());
assertEquals(0, executed.get());
} finally {
executor.shutdownNow();
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
}
}

Expand All @@ -99,9 +99,8 @@ public void testDelay() throws InterruptedException {

assertThat(laterDelays,
Matchers.contains(initialDelays.stream().map(Matchers::lessThan).collect(Collectors.toList())));

} finally {
threadPool.shutdownNow();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -136,7 +135,7 @@ public void testScheduledOnThreadPool() throws InterruptedException {

assertTrue(missingExecutions.await(30, TimeUnit.SECONDS));
} finally {
threadPool.shutdownNow();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

Expand All @@ -151,7 +150,7 @@ public void testScheduledOnScheduler() throws InterruptedException {
scheduler.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), ThreadPool.Names.SAME);
assertTrue(missingExecutions.await(30, TimeUnit.SECONDS));
} finally {
executor.shutdownNow();
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
}
}
}

0 comments on commit eac37ad

Please sign in to comment.