Skip to content

Commit

Permalink
Handle scheduler exceptions (elastic#38014)
Browse files Browse the repository at this point in the history
Scheduler.schedule(...) would previously assume that caller handles
exception by calling get() on the returned ScheduledFuture.
schedule() now returns a ScheduledCancellable that no longer gives
access to the exception. Instead, any exception thrown out of a
scheduled Runnable is logged as a warning.

This is a continuation of elastic#28667, elastic#36137 and also fixes elastic#37708.
  • Loading branch information
henningandersen authored Jan 31, 2019
1 parent 7f738e8 commit 68ed72b
Show file tree
Hide file tree
Showing 59 changed files with 617 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

/**
Expand Down Expand Up @@ -68,7 +67,7 @@ public interface ThreadWatchdog {
static ThreadWatchdog newInstance(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
BiConsumer<Long, Runnable> scheduler) {
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
}

Expand Down Expand Up @@ -105,15 +104,15 @@ class Default implements ThreadWatchdog {
private final long interval;
private final long maxExecutionTime;
private final LongSupplier relativeTimeSupplier;
private final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
private final BiConsumer<Long, Runnable> scheduler;
private final AtomicInteger registered = new AtomicInteger(0);
private final AtomicBoolean running = new AtomicBoolean(false);
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();

private Default(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
BiConsumer<Long, Runnable> scheduler) {
this.interval = interval;
this.maxExecutionTime = maxExecutionTime;
this.relativeTimeSupplier = relativeTimeSupplier;
Expand All @@ -124,7 +123,7 @@ public void register() {
registered.getAndIncrement();
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
if (running.compareAndSet(false, true) == true) {
scheduler.apply(interval, this::interruptLongRunningExecutions);
scheduler.accept(interval, this::interruptLongRunningExecutions);
}
assert previousValue == null;
}
Expand All @@ -149,7 +148,7 @@ private void interruptLongRunningExecutions() {
}
}
if (registered.get() > 0) {
scheduler.apply(interval, this::interruptLongRunningExecutions);
scheduler.accept(interval, this::interruptLongRunningExecutions);
} else {
running.set(false);
}
Expand Down
6 changes: 2 additions & 4 deletions libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.BiConsumer;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -418,7 +417,7 @@ public void testExponentialExpressions() {
"Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}";
String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " +
"Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018";
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> {
BiConsumer<Long, Runnable> scheduler = (delay, command) -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Expand All @@ -430,7 +429,6 @@ public void testExponentialExpressions() {
}
});
t.start();
return null;
};
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public void testInterrupt() throws Exception {
}
});
thread.start();
return null;
});

Map<?, ?> registry = ((ThreadWatchdog.Default) watchdog).registry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public List<Setting<?>> getSettings() {
private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis();
long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis();
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, parameters.relativeTimeSupplier, parameters.scheduler);
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis,
parameters.relativeTimeSupplier, parameters.scheduler::apply);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void onFailure(Exception e) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e);
countSearchRetry.run();
threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this);
threadPool.schedule(RetryHelper.this, delay, ThreadPool.Names.SAME);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -323,7 +322,7 @@ public void testThreadPoolRejectionsAbortRequest() throws Exception {
worker.rethrottle(1);
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
// While we're here we can check that the sleep made it through
assertThat(delay.nanos(), greaterThan(0L));
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
Expand Down Expand Up @@ -442,7 +441,7 @@ public void testScrollDelay() throws Exception {
AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
capturedDelay.set(delay);
capturedCommand.set(command);
return null;
Expand Down Expand Up @@ -618,7 +617,7 @@ public void testCancelWhileDelayedAfterScrollResponse() throws Exception {
*/
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
/*
* This is called twice:
* 1. To schedule the throttling. When that happens we immediately cancel the task.
Expand All @@ -629,7 +628,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
if (delay.nanos() > 0) {
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
}
return super.schedule(delay, name, command);
return super.schedule(command, delay, name);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -104,7 +103,7 @@ public ExecutorService executor(String name) {
}

@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
command.run();
return null;
}
Expand Down
Loading

0 comments on commit 68ed72b

Please sign in to comment.