Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle scheduler exceptions #38014

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

/**
Expand Down Expand Up @@ -68,7 +67,7 @@ public interface ThreadWatchdog {
static ThreadWatchdog newInstance(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
BiConsumer<Long, Runnable> scheduler) {
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
}

Expand Down Expand Up @@ -105,15 +104,15 @@ class Default implements ThreadWatchdog {
private final long interval;
private final long maxExecutionTime;
private final LongSupplier relativeTimeSupplier;
private final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
private final BiConsumer<Long, Runnable> scheduler;
private final AtomicInteger registered = new AtomicInteger(0);
private final AtomicBoolean running = new AtomicBoolean(false);
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();

private Default(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
BiConsumer<Long, Runnable> scheduler) {
this.interval = interval;
this.maxExecutionTime = maxExecutionTime;
this.relativeTimeSupplier = relativeTimeSupplier;
Expand All @@ -124,7 +123,7 @@ public void register() {
registered.getAndIncrement();
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
if (running.compareAndSet(false, true) == true) {
scheduler.apply(interval, this::interruptLongRunningExecutions);
scheduler.accept(interval, this::interruptLongRunningExecutions);
}
assert previousValue == null;
}
Expand All @@ -149,7 +148,7 @@ private void interruptLongRunningExecutions() {
}
}
if (registered.get() > 0) {
scheduler.apply(interval, this::interruptLongRunningExecutions);
scheduler.accept(interval, this::interruptLongRunningExecutions);
} else {
running.set(false);
}
Expand Down
6 changes: 2 additions & 4 deletions libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.BiConsumer;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -418,7 +417,7 @@ public void testExponentialExpressions() {
"Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}";
String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " +
"Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018";
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> {
BiConsumer<Long, Runnable> scheduler = (delay, command) -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Expand All @@ -430,7 +429,6 @@ public void testExponentialExpressions() {
}
});
t.start();
return null;
};
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public void testInterrupt() throws Exception {
}
});
thread.start();
return null;
});

Map<?, ?> registry = ((ThreadWatchdog.Default) watchdog).registry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public List<Setting<?>> getSettings() {
private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis();
long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis();
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, parameters.relativeTimeSupplier, parameters.scheduler);
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis,
parameters.relativeTimeSupplier, parameters.scheduler::apply);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void onFailure(Exception e) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e);
countSearchRetry.run();
threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this);
threadPool.schedule(RetryHelper.this, delay, ThreadPool.Names.SAME);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -320,7 +319,7 @@ public void testThreadPoolRejectionsAbortRequest() throws Exception {
worker.rethrottle(1);
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
// While we're here we can check that the sleep made it through
assertThat(delay.nanos(), greaterThan(0L));
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
Expand Down Expand Up @@ -439,7 +438,7 @@ public void testScrollDelay() throws Exception {
AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
capturedDelay.set(delay);
capturedCommand.set(command);
return null;
Expand Down Expand Up @@ -615,7 +614,7 @@ public void testCancelWhileDelayedAfterScrollResponse() throws Exception {
*/
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
/*
* This is called twice:
* 1. To schedule the throttling. When that happens we immediately cancel the task.
Expand All @@ -626,7 +625,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
if (delay.nanos() > 0) {
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
}
return super.schedule(delay, name, command);
return super.schedule(command, delay, name);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -104,7 +103,7 @@ public ExecutorService executor(String name) {
}

@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
command.run();
return null;
}
Expand Down
Loading