diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java b/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java index f3515fcfe83b0..602b3e97635ee 100644 --- a/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java +++ b/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java @@ -20,10 +20,9 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiFunction; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; /** @@ -68,7 +67,7 @@ public interface ThreadWatchdog { static ThreadWatchdog newInstance(long interval, long maxExecutionTime, LongSupplier relativeTimeSupplier, - BiFunction> scheduler) { + BiConsumer scheduler) { return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler); } @@ -105,7 +104,7 @@ class Default implements ThreadWatchdog { private final long interval; private final long maxExecutionTime; private final LongSupplier relativeTimeSupplier; - private final BiFunction> scheduler; + private final BiConsumer scheduler; private final AtomicInteger registered = new AtomicInteger(0); private final AtomicBoolean running = new AtomicBoolean(false); final ConcurrentHashMap registry = new ConcurrentHashMap<>(); @@ -113,7 +112,7 @@ class Default implements ThreadWatchdog { private Default(long interval, long maxExecutionTime, LongSupplier relativeTimeSupplier, - BiFunction> scheduler) { + BiConsumer scheduler) { this.interval = interval; this.maxExecutionTime = maxExecutionTime; this.relativeTimeSupplier = relativeTimeSupplier; @@ -124,7 +123,7 @@ public void register() { registered.getAndIncrement(); Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong()); if (running.compareAndSet(false, true) == true) { - scheduler.apply(interval, this::interruptLongRunningExecutions); + scheduler.accept(interval, this::interruptLongRunningExecutions); } assert previousValue == null; } @@ -149,7 +148,7 @@ private void interruptLongRunningExecutions() { } } if (registered.get() > 0) { - scheduler.apply(interval, this::interruptLongRunningExecutions); + scheduler.accept(interval, this::interruptLongRunningExecutions); } else { running.set(false); } diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java index 0193811a20d0a..10ed049561899 100644 --- a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java +++ b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java @@ -27,9 +27,8 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiFunction; +import java.util.function.BiConsumer; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -418,7 +417,7 @@ public void testExponentialExpressions() { "Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}"; String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " + "Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018"; - BiFunction> scheduler = (delay, command) -> { + BiConsumer scheduler = (delay, command) -> { try { Thread.sleep(delay); } catch (InterruptedException e) { @@ -430,7 +429,6 @@ public void testExponentialExpressions() { } }); t.start(); - return null; }; Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler)); Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine)); diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java index 29e2351215f60..95a50c1fc5c2a 100644 --- a/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java +++ b/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java @@ -51,7 +51,6 @@ public void testInterrupt() throws Exception { } }); thread.start(); - return null; }); Map registry = ((ThreadWatchdog.Default) watchdog).registry; diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 41e96253f2851..a839e147c77ad 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -111,7 +111,8 @@ public List> getSettings() { private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) { long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis(); long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis(); - return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, parameters.relativeTimeSupplier, parameters.scheduler); + return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, + parameters.relativeTimeSupplier, parameters.scheduler::apply); } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java index 9264cdde30c75..d729a4d9c3aa7 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java @@ -215,7 +215,7 @@ public void onFailure(Exception e) { logger.trace( (Supplier) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e); countSearchRetry.run(); - threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this); + threadPool.schedule(RetryHelper.this, delay, ThreadPool.Names.SAME); return; } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 3ee651cf96184..92b2180f4da6c 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -93,7 +93,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -323,7 +322,7 @@ public void testThreadPoolRejectionsAbortRequest() throws Exception { worker.rethrottle(1); setupClient(new TestThreadPool(getTestName()) { @Override - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { // While we're here we can check that the sleep made it through assertThat(delay.nanos(), greaterThan(0L)); assertThat(delay.seconds(), lessThanOrEqualTo(10L)); @@ -442,7 +441,7 @@ public void testScrollDelay() throws Exception { AtomicReference capturedCommand = new AtomicReference<>(); setupClient(new TestThreadPool(getTestName()) { @Override - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { capturedDelay.set(delay); capturedCommand.set(command); return null; @@ -618,7 +617,7 @@ public void testCancelWhileDelayedAfterScrollResponse() throws Exception { */ setupClient(new TestThreadPool(getTestName()) { @Override - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { /* * This is called twice: * 1. To schedule the throttling. When that happens we immediately cancel the task. @@ -629,7 +628,7 @@ public ScheduledFuture schedule(TimeValue delay, String name, Runnable comman if (delay.nanos() > 0) { generic().execute(() -> taskManager.cancel(testTask, reason, () -> {})); } - return super.schedule(delay, name, command); + return super.schedule(command, delay, name); } }); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java index 559d211b7ed1f..844c6b8351993 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java @@ -69,7 +69,6 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -104,7 +103,7 @@ public ExecutorService executor(String name) { } @Override - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { command.run(); return null; } diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java index d118df9abde47..02ba33f19c4f8 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java @@ -19,6 +19,11 @@ package org.elasticsearch.threadpool; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -26,6 +31,7 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; import org.junit.After; import org.junit.Before; @@ -38,6 +44,7 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; @@ -108,7 +115,12 @@ public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws In try { checkExecutionError(getExecuteRunner(prioritizedExecutor)); checkExecutionError(getSubmitRunner(prioritizedExecutor)); + // bias towards timeout + checkExecutionError(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r)); + // race whether timeout or success (but typically biased towards success) checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r)); + // bias towards no timeout. + checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r)); } finally { ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS); } @@ -170,10 +182,7 @@ public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedE final boolean expectExceptionOnSchedule = // fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable // TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener - ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE - // scheduler just swallows the exception here - // TODO: bubble these exceptions up - && ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.DIRECT; + ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE; checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule); } } @@ -219,14 +228,19 @@ public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37708") public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException { final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test", EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler()); try { checkExecutionException(getExecuteRunner(prioritizedExecutor), true); checkExecutionException(getSubmitRunner(prioritizedExecutor), false); + + // bias towards timeout + checkExecutionException(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r), true); + // race whether timeout or success (but typically biased towards success) checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true); + // bias towards no timeout. + checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r), true); } finally { ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS); } @@ -235,26 +249,39 @@ public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throw public void testExecutionExceptionOnScheduler() throws InterruptedException { final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); try { - // scheduler just swallows the exceptions - // TODO: bubble these exceptions up - checkExecutionException(getExecuteRunner(scheduler), false); - checkExecutionException(getSubmitRunner(scheduler), false); - checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), false); + checkExecutionException(getExecuteRunner(scheduler), true); + // while submit does return a Future, we choose to log exceptions anyway, + // since this is the semi-internal SafeScheduledThreadPoolExecutor that is being used, + // which also logs exceptions for schedule calls. + checkExecutionException(getSubmitRunner(scheduler), true); + checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), true); } finally { Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS); } } + private Runnable delayMillis(Runnable r, int ms) { + return () -> { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + r.run(); + }; + } + private void checkExecutionException(Consumer runner, boolean expectException) throws InterruptedException { - logger.info("checking exception for {}", runner); final Runnable runnable; final boolean willThrow; if (randomBoolean()) { + logger.info("checking direct exception for {}", runner); runnable = () -> { throw new IllegalStateException("future exception"); }; willThrow = expectException; } else { + logger.info("checking abstract runnable exception for {}", runner); runnable = new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -275,6 +302,7 @@ protected void doRun() { o -> { assertEquals(willThrow, o.isPresent()); if (willThrow) { + if (o.get() instanceof Error) throw (Error) o.get(); assertThat(o.get(), instanceOf(IllegalStateException.class)); assertThat(o.get(), hasToString(containsString("future exception"))); } @@ -313,7 +341,7 @@ Consumer getScheduleRunner(String executor) { return new Consumer() { @Override public void accept(Runnable runnable) { - threadPool.schedule(randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor, runnable); + threadPool.schedule(runnable, randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor); } @Override @@ -324,10 +352,10 @@ public String toString() { } private void runExecutionTest( - final Consumer runner, - final Runnable runnable, - final boolean expectThrowable, - final Consumer> consumer) throws InterruptedException { + final Consumer runner, + final Runnable runnable, + final boolean expectThrowable, + final Consumer> consumer) throws InterruptedException { final AtomicReference throwableReference = new AtomicReference<>(); final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler(); final CountDownLatch uncaughtExceptionHandlerLatch = new CountDownLatch(1); @@ -335,31 +363,67 @@ private void runExecutionTest( try { Thread.setDefaultUncaughtExceptionHandler((t, e) -> { assertTrue(expectThrowable); - throwableReference.set(e); + assertTrue("Only one message allowed", throwableReference.compareAndSet(null, e)); uncaughtExceptionHandlerLatch.countDown(); }); + final CountDownLatch supplierLatch = new CountDownLatch(1); - try { - runner.accept(() -> { - try { - runnable.run(); - } finally { - supplierLatch.countDown(); + Runnable job = () -> { + try { + runnable.run(); + } finally { + supplierLatch.countDown(); + } + }; + + // snoop on logging to also handle the cases where exceptions are simply logged in Scheduler. + final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class); + final MockLogAppender appender = new MockLogAppender(); + appender.addExpectation( + new MockLogAppender.LoggingExpectation() { + @Override + public void match(LogEvent event) { + if (event.getLevel() == Level.WARN) { + assertThat("no other warnings than those expected", + event.getMessage().getFormattedMessage(), + equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]")); + assertTrue(expectThrowable); + assertNotNull(event.getThrown()); + assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown())); + uncaughtExceptionHandlerLatch.countDown(); + } + } + + @Override + public void assertMatched() { } }); - } catch (Throwable t) { - consumer.accept(Optional.of(t)); - return; - } - supplierLatch.await(); + appender.start(); + Loggers.addAppender(schedulerLogger, appender); + try { + try { + runner.accept(job); + } catch (Throwable t) { + consumer.accept(Optional.of(t)); + return; + } + + supplierLatch.await(); - if (expectThrowable) { - uncaughtExceptionHandlerLatch.await(); + if (expectThrowable) { + uncaughtExceptionHandlerLatch.await(); + } + } finally { + Loggers.removeAppender(schedulerLogger, appender); + appender.stop(); } + consumer.accept(Optional.ofNullable(throwableReference.get())); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); } finally { Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index c083c89567799..02bebb5b38e42 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -203,10 +203,15 @@ public static Builder builder(BiConsumer scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS), + buildScheduler(scheduledThreadPoolExecutor), () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); } + private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) { + return (command, delay, executor) -> + Scheduler.wrapAsScheduledCancellable(scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); + } + private final int bulkActions; private final long bulkSize; @@ -343,7 +348,9 @@ private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler if (flushInterval == null) { return new Scheduler.Cancellable() { @Override - public void cancel() {} + public boolean cancel() { + return false; + } @Override public boolean isCancelled() { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java index 6ec5017e83268..a3c6e27eaf838 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -31,7 +30,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ScheduledFuture; import java.util.function.BiConsumer; import java.util.function.Predicate; @@ -89,7 +87,7 @@ static class RetryHandler implements ActionListener { // needed to construct the next bulk request based on the response to the previous one // volatile as we're called from a scheduled thread private volatile BulkRequest currentBulkRequest; - private volatile ScheduledFuture scheduledRequestFuture; + private volatile Scheduler.Cancellable retryCancellable; RetryHandler(BackoffPolicy backoffPolicy, BiConsumer> consumer, ActionListener listener, Scheduler scheduler) { @@ -123,7 +121,9 @@ public void onFailure(Exception e) { try { listener.onFailure(e); } finally { - FutureUtils.cancel(scheduledRequestFuture); + if (retryCancellable != null) { + retryCancellable.cancel(); + } } } @@ -132,7 +132,7 @@ private void retry(BulkRequest bulkRequestForRetry) { TimeValue next = backoff.next(); logger.trace("Retry of bulk request scheduled in {} ms.", next.millis()); Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry)); - scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command); + retryCancellable = scheduler.schedule(command, next, ThreadPool.Names.SAME); } private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) { @@ -166,7 +166,9 @@ private void finishHim() { try { listener.onResponse(getAccumulatedResponse()); } finally { - FutureUtils.cancel(scheduledRequestFuture); + if (retryCancellable != null) { + retryCancellable.cancel(); + } } } diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 7387b03ee822d..d442069b57f93 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -42,7 +42,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; @@ -68,7 +68,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; final class TransportClientNodesService implements Closeable { @@ -100,7 +99,7 @@ final class TransportClientNodesService implements Closeable { private final NodeSampler nodesSampler; - private volatile ScheduledFuture nodesSamplerFuture; + private volatile Scheduler.Cancellable nodesSamplerCancellable; private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt()); @@ -146,7 +145,7 @@ final class TransportClientNodesService implements Closeable { this.nodesSampler = new SimpleNodeSampler(); } this.hostFailureListener = hostFailureListener; - this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler()); + this.nodesSamplerCancellable = threadPool.schedule(new ScheduledNodeSampler(), nodesSamplerInterval, ThreadPool.Names.GENERIC); } public List transportAddresses() { @@ -325,7 +324,9 @@ public void close() { return; } closed = true; - FutureUtils.cancel(nodesSamplerFuture); + if (nodesSamplerCancellable != null) { + nodesSamplerCancellable.cancel(); + } for (DiscoveryNode node : nodes) { transportService.disconnectFromNode(node); } @@ -392,7 +393,7 @@ public void run() { try { nodesSampler.sample(); if (!closed) { - nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this); + nodesSamplerCancellable = threadPool.schedule(this, nodesSamplerInterval, ThreadPool.Names.GENERIC); } } catch (Exception e) { logger.warn("failed to sample", e); diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index 90526aaa9fd21..d6c2824fdbb10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -31,10 +31,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.discovery.zen.MasterFaultDetection; import org.elasticsearch.discovery.zen.NodesFaultDetection; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -42,7 +42,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledFuture; import static org.elasticsearch.common.settings.Setting.Property; import static org.elasticsearch.common.settings.Setting.positiveTimeSetting; @@ -71,7 +70,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { private final TimeValue reconnectInterval; - private volatile ScheduledFuture backgroundFuture = null; + private volatile Scheduler.Cancellable backgroundCancellable = null; @Inject public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) { @@ -187,19 +186,21 @@ protected void doRun() { @Override public void onAfter() { if (lifecycle.started()) { - backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this); + backgroundCancellable = threadPool.schedule(this, reconnectInterval, ThreadPool.Names.GENERIC); } } } @Override protected void doStart() { - backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ConnectionChecker()); + backgroundCancellable = threadPool.schedule(new ConnectionChecker(), reconnectInterval, ThreadPool.Names.GENERIC); } @Override protected void doStop() { - FutureUtils.cancel(backgroundFuture); + if (backgroundCancellable != null) { + backgroundCancellable.cancel(); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 60b38861a3ad5..d73d33a0635c0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -976,7 +976,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) new ListenableFuture<>(), ackListener, publishListener); currentPublication = Optional.of(publication); - transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() { + transportService.getThreadPool().schedule(new Runnable() { @Override public void run() { synchronized (mutex) { @@ -988,7 +988,7 @@ public void run() { public String toString() { return "scheduled timeout for " + publication; } - }); + }, publishTimeout, Names.GENERIC); final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes(); leaderChecker.setCurrentNodes(publishNodes); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index cd40e8e0f1f66..caae478e97a12 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -392,7 +392,7 @@ public String toString() { } private void scheduleNextWakeUp() { - transportService.getThreadPool().schedule(followerCheckInterval, Names.SAME, new Runnable() { + transportService.getThreadPool().schedule(new Runnable() { @Override public void run() { handleWakeUp(); @@ -402,7 +402,7 @@ public void run() { public String toString() { return FollowerChecker.this + "::handleWakeUp"; } - }); + }, followerCheckInterval, Names.SAME); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 0f387fca18a13..a0b368e4e1dcc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -301,7 +301,7 @@ void handleDisconnectedNode(DiscoveryNode discoveryNode) { private void scheduleNextWakeUp() { logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval); - transportService.getThreadPool().schedule(leaderCheckInterval, Names.SAME, new Runnable() { + transportService.getThreadPool().schedule(new Runnable() { @Override public void run() { handleWakeUp(); @@ -311,7 +311,7 @@ public void run() { public String toString() { return "scheduled check of leader " + leader; } - }); + }, leaderCheckInterval, Names.SAME); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java index c9cc5b628e292..82d459f7f3cce 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java @@ -32,10 +32,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -69,7 +68,7 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme class DelayedRerouteTask extends ClusterStateUpdateTask { final TimeValue nextDelay; // delay until submitting the reroute command final long baseTimestampNanos; // timestamp (in nanos) upon which delay was calculated - volatile ScheduledFuture future; + volatile Scheduler.Cancellable cancellable; final AtomicBoolean cancelScheduling = new AtomicBoolean(); DelayedRerouteTask(TimeValue nextDelay, long baseTimestampNanos) { @@ -83,12 +82,14 @@ public long scheduledTimeToRunInNanos() { public void cancelScheduling() { cancelScheduling.set(true); - FutureUtils.cancel(future); + if (cancellable != null) { + cancellable.cancel(); + } removeIfSameTask(this); } public void schedule() { - future = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() { + cancellable = threadPool.schedule(new AbstractRunnable() { @Override protected void doRun() throws Exception { if (cancelScheduling.get()) { @@ -102,7 +103,7 @@ public void onFailure(Exception e) { logger.warn("failed to submit schedule/execute reroute post unassigned shard", e); removeIfSameTask(DelayedRerouteTask.this); } - }); + }, nextDelay, ThreadPool.Names.SAME); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 496ee9040a899..313ff4c660866 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -42,9 +42,9 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; @@ -55,7 +55,6 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -281,7 +280,7 @@ public void addTimeoutListener(@Nullable final TimeValue timeout, final TimeoutC public void run() { if (timeout != null) { NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); - notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout); + notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, ThreadPool.Names.GENERIC); onGoingTimeouts.add(notifyTimeout); } timeoutClusterStateListeners.add(listener); @@ -526,7 +525,7 @@ protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) class NotifyTimeout implements Runnable { final TimeoutClusterStateListener listener; final TimeValue timeout; - volatile ScheduledFuture future; + volatile Scheduler.Cancellable cancellable; NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) { this.listener = listener; @@ -534,12 +533,14 @@ class NotifyTimeout implements Runnable { } public void cancel() { - FutureUtils.cancel(future); + if (cancellable != null) { + cancellable.cancel(); + } } @Override public void run() { - if (future != null && future.isCancelled()) { + if (cancellable != null && cancellable.isCancelled()) { return; } if (lifecycle.stoppedOrClosed()) { diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 50cf4ffe9de8a..beb42fa1c6814 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -52,6 +52,7 @@ import org.elasticsearch.cluster.coordination.ClusterStatePublisher; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; @@ -60,7 +61,6 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -606,7 +606,7 @@ private static class AckCountDownListener implements Discovery.AckListener { private final DiscoveryNode masterNode; private final ThreadPool threadPool; private final long clusterStateVersion; - private volatile Future ackTimeoutCallback; + private volatile Scheduler.Cancellable ackTimeoutCallback; private Exception lastFailure; AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes, @@ -638,10 +638,10 @@ public void onCommit(TimeValue commitTime) { } else if (countDown.countDown()) { finish(); } else { - this.ackTimeoutCallback = threadPool.schedule(timeLeft, ThreadPool.Names.GENERIC, this::onTimeout); + this.ackTimeoutCallback = threadPool.schedule(this::onTimeout, timeLeft, ThreadPool.Names.GENERIC); // re-check if onNodeAck has not completed while we were scheduling the timeout if (countDown.isCountedDown()) { - FutureUtils.cancel(ackTimeoutCallback); + ackTimeoutCallback.cancel(); } } } @@ -666,7 +666,9 @@ public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { private void finish() { logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion); - FutureUtils.cancel(ackTimeoutCallback); + if (ackTimeoutCallback != null) { + ackTimeoutCallback.cancel(); + } ackedTaskListener.onAllNodesAcked(lastFailure); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java index e06e1a41907db..3c1716cda1522 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -21,11 +21,11 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.util.Objects; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -37,7 +37,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable { private final ThreadPool threadPool; private final AtomicBoolean closed = new AtomicBoolean(false); private final boolean autoReschedule; - private volatile ScheduledFuture scheduledFuture; + private volatile Scheduler.Cancellable cancellable; private volatile boolean isScheduledOrRunning; private volatile Exception lastThrownException; private volatile TimeValue interval; @@ -56,7 +56,7 @@ protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue inte */ public synchronized void setInterval(TimeValue interval) { this.interval = interval; - if (scheduledFuture != null) { + if (cancellable != null) { rescheduleIfNecessary(); } } @@ -84,18 +84,18 @@ public synchronized void rescheduleIfNecessary() { if (isClosed()) { return; } - if (scheduledFuture != null) { - FutureUtils.cancel(scheduledFuture); + if (cancellable != null) { + cancellable.cancel(); } if (interval.millis() > 0 && mustReschedule()) { if (logger.isTraceEnabled()) { logger.trace("scheduling {} every {}", toString(), interval); } - scheduledFuture = threadPool.schedule(interval, getThreadPool(), this); + cancellable = threadPool.schedule(this, interval, getThreadPool()); isScheduledOrRunning = true; } else { logger.trace("scheduled {} disabled", toString()); - scheduledFuture = null; + cancellable = null; isScheduledOrRunning = false; } } @@ -110,8 +110,10 @@ public boolean isScheduled() { * Cancel any scheduled run, but do not prevent subsequent restarts. */ public synchronized void cancel() { - FutureUtils.cancel(scheduledFuture); - scheduledFuture = null; + if (cancellable != null) { + cancellable.cancel(); + cancellable = null; + } isScheduledOrRunning = false; } @@ -132,7 +134,7 @@ public boolean isClosed() { @Override public final void run() { synchronized (this) { - scheduledFuture = null; + cancellable = null; isScheduledOrRunning = autoReschedule; } try { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index cb358a0596d25..29cd7f6682a64 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -118,8 +118,11 @@ public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int * Checks if the runnable arose from asynchronous submission of a task to an executor. If an uncaught exception was thrown * during the execution of this task, we need to inspect this runnable and see if it is an error that should be propagated * to the uncaught exception handler. + * + * @param runnable the runnable to inspect, should be a RunnableFuture + * @return non fatal exception or null if no exception. */ - public static void rethrowErrors(Runnable runnable) { + public static Throwable rethrowErrors(Runnable runnable) { if (runnable instanceof RunnableFuture) { try { ((RunnableFuture) runnable).get(); @@ -143,8 +146,13 @@ public static void rethrowErrors(Runnable runnable) { // restore the interrupt status Thread.currentThread().interrupt(); } + if (e instanceof ExecutionException) { + return e.getCause(); + } } } + + return null; } private static final class DirectExecutorService extends AbstractExecutorService { diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java index 8a311c4506fa1..4c1fbcd702afa 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java @@ -128,7 +128,7 @@ private void innerStart(final DiscoveryNode masterNode) { this.masterPinger = new MasterPinger(); // we start pinging slightly later to allow the chosen master to complete it's own master election - threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger); + threadPool.schedule(masterPinger, pingInterval, ThreadPool.Names.SAME); } public void stop(String reason) { @@ -174,7 +174,7 @@ protected void handleTransportDisconnect(DiscoveryNode node) { } this.masterPinger = new MasterPinger(); // we use schedule with a 0 time value to run the pinger on the pool as it will run on later - threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger); + threadPool.schedule(masterPinger, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME); } catch (Exception e) { logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode); notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)"); @@ -218,7 +218,7 @@ public void run() { final DiscoveryNode masterToPing = masterNode; if (masterToPing == null) { // master is null, should not happen, but we are still running, so reschedule - threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); + threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME); return; } @@ -243,7 +243,7 @@ public void handleResponse(MasterPingResponseResponse response) { // check if the master node did not get switched on us..., if it did, we simply return with no reschedule if (masterToPing.equals(MasterFaultDetection.this.masterNode())) { // we don't stop on disconnection from master, we keep pinging it - threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); + threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java index 88ba868289ec2..4b89c9f19dc59 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java @@ -132,7 +132,7 @@ public void updateNodesAndPing(ClusterState clusterState) { // it's OK to overwrite an existing nodeFD - it will just stop and the new one will pick things up. nodesFD.put(node, fd); // we use schedule with a 0 time value to run the pinger on the pool as it will run on later - threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd); + threadPool.schedule(fd, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME); } } } @@ -161,7 +161,7 @@ protected void handleTransportDisconnect(DiscoveryNode node) { transportService.connectToNode(node); nodesFD.put(node, fd); // we use schedule with a 0 time value to run the pinger on the pool as it will run on later - threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd); + threadPool.schedule(fd, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME); } catch (Exception e) { logger.trace("[node ] [{}] transport disconnected (with verified connect)", node); // clean up if needed, just to be safe.. @@ -240,7 +240,7 @@ public void handleResponse(PingResponse response) { return; } retryCount = 0; - threadPool.schedule(pingInterval, ThreadPool.Names.SAME, NodeFD.this); + threadPool.schedule(NodeFD.this, pingInterval, ThreadPool.Names.SAME); } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index c9470984b1487..e1261b3d322a3 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -304,9 +304,9 @@ protected void doRun() throws Exception { } }; threadPool.generic().execute(pingSender); - threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender); - threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender); - threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() { + threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC); + threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC); + threadPool.schedule(new AbstractRunnable() { @Override protected void doRun() throws Exception { finishPingingRound(pingingRound); @@ -316,7 +316,7 @@ protected void doRun() throws Exception { public void onFailure(Exception e) { logger.warn("unexpected error while finishing pinging round", e); } - }); + }, scheduleDuration, ThreadPool.Names.GENERIC); } // for testing @@ -557,8 +557,8 @@ private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) temporalResponses.add(request.pingResponse); // add to any ongoing pinging activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse)); - threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, - () -> temporalResponses.remove(request.pingResponse)); + threadPool.schedule(() -> temporalResponses.remove(request.pingResponse), + TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME); List pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses); pingResponses.add(createPingResponse(contextProvider.clusterState())); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index c71ae40235056..04eb14669e7a2 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -206,12 +206,12 @@ private void performStateRecovery(final boolean enforceRecoverAfterTime, final S if (enforceRecoverAfterTime && recoverAfterTime != null) { if (scheduledRecovery.compareAndSet(false, true)) { logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason); - threadPool.schedule(recoverAfterTime, ThreadPool.Names.GENERIC, () -> { + threadPool.schedule(() -> { if (recovered.compareAndSet(false, true)) { logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime); recoveryRunnable.run(); } - }); + }, recoverAfterTime, ThreadPool.Names.GENERIC); } } else { if (recovered.compareAndSet(false, true)) { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java index 4fc5770709c1b..454983ba7942a 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java @@ -155,7 +155,7 @@ public void onFailure(Exception e) { TimeValue delay = retries.next(); logger.trace(() -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e); countSearchRetry.run(); - threadPool.schedule(delay, ThreadPool.Names.SAME, retryWithContext); + threadPool.schedule(retryWithContext, delay, ThreadPool.Names.SAME); } else { logger.warn(() -> new ParameterizedMessage( "giving up on search because we retried [{}] times without success", retryCount), e); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java b/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java index 17bf59a104a80..ae2a6a552cba4 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java @@ -24,11 +24,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -173,10 +172,10 @@ TimeValue throttledUntil() { if (delayed == null) { return timeValueNanos(0); } - if (delayed.future == null) { + if (delayed.scheduled == null) { return timeValueNanos(0); } - return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS))); + return timeValueNanos(max(0, delayed.scheduled.getDelay(TimeUnit.NANOSECONDS))); } /** @@ -249,16 +248,16 @@ class DelayedPrepareBulkRequest { private final ThreadPool threadPool; private final Runnable command; private final float requestsPerSecond; - private final ScheduledFuture future; + private final Scheduler.ScheduledCancellable scheduled; DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, Runnable command) { this.threadPool = threadPool; this.requestsPerSecond = requestsPerSecond; this.command = command; - this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, () -> { + this.scheduled = threadPool.schedule(() -> { throttledNanos.addAndGet(delay.nanos()); command.run(); - }); + }, delay, ThreadPool.Names.GENERIC); } DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) { @@ -272,9 +271,9 @@ DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) { return this; } - long remainingDelay = future.getDelay(TimeUnit.NANOSECONDS); + long remainingDelay = scheduled.getDelay(TimeUnit.NANOSECONDS); // Actually reschedule the task - if (false == FutureUtils.cancel(future)) { + if (scheduled == null || false == scheduled.cancel()) { // Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here. logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", task.getId()); return this; diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 9113ba3f23b14..4682cd4dff421 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -198,7 +198,7 @@ public class IndicesService extends AbstractLifecycleComponent @Override protected void doStart() { // Start thread that will manage cleaning the field data cache periodically - threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME, this.cacheCleaner); + threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME); } public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv, NamedXContentRegistry xContentRegistry, @@ -1164,7 +1164,7 @@ public void run() { } // Reschedule itself to run again if not closed if (closed.get() == false) { - threadPool.schedule(interval, ThreadPool.Names.SAME, this); + threadPool.schedule(this, interval, ThreadPool.Names.SAME); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index dbbaed1132e62..74585dcc261c2 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -164,7 +164,7 @@ protected void retryRecovery(final long recoveryId, final String reason, TimeVal private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) { RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout); if (newTarget != null) { - threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId())); + threadPool.schedule(new RecoveryRunner(newTarget.recoveryId()), retryAfter, ThreadPool.Names.GENERIC); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index 6b442750c1898..d08d7b6d0bd98 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -76,8 +76,8 @@ private void startRecoveryInternal(RecoveryTarget recoveryTarget, TimeValue acti assert existingTarget == null : "found two RecoveryStatus instances with the same id"; logger.trace("{} started recovery from {}, id [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode(), recoveryTarget.recoveryId()); - threadPool.schedule(activityTimeout, ThreadPool.Names.GENERIC, - new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout)); + threadPool.schedule(new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout), + activityTimeout, ThreadPool.Names.GENERIC); } /** @@ -289,7 +289,7 @@ protected void doRun() throws Exception { } lastSeenAccessTime = accessTime; logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", recoveryId, lastSeenAccessTime); - threadPool.schedule(checkInterval, ThreadPool.Names.GENERIC, this); + threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 00b04bff2e5fd..b2143d72ae65f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -95,7 +95,7 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool, env, scriptService, analysisRegistry, threadPool.getThreadContext(), threadPool::relativeTimeInMillis, (delay, command) -> threadPool.schedule( - TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command + command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC ), this ) ); diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 498ec3a77104f..92b08bba77bf7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -23,9 +23,9 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.Scheduler; import java.util.Map; -import java.util.concurrent.ScheduledFuture; import java.util.function.BiFunction; import java.util.function.LongSupplier; @@ -105,10 +105,10 @@ class Parameters { /** * Provides scheduler support */ - public final BiFunction> scheduler; + public final BiFunction scheduler; public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, - LongSupplier relativeTimeSupplier, BiFunction> scheduler, + LongSupplier relativeTimeSupplier, BiFunction scheduler, IngestService ingestService) { this.env = env; this.scriptService = scriptService; diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index b05e87db91943..77a873316a53d 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -190,7 +190,7 @@ public void onFailure(Exception e) { } else { TimeValue wait = backoff.next(); logger.warn(() -> new ParameterizedMessage("failed to store task result, retrying in [{}]", wait), e); - threadPool.schedule(wait, ThreadPool.Names.SAME, () -> doStoreResult(backoff, index, listener)); + threadPool.schedule(() -> doStoreResult(backoff, index, listener), wait, ThreadPool.Names.SAME); } } }); diff --git a/server/src/main/java/org/elasticsearch/threadpool/CancellableAdapter.java b/server/src/main/java/org/elasticsearch/threadpool/CancellableAdapter.java new file mode 100644 index 0000000000000..9b5f658c6243e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/threadpool/CancellableAdapter.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.util.concurrent.FutureUtils; + +import java.util.concurrent.Future; + +class CancellableAdapter implements Scheduler.Cancellable { + private Future future; + + CancellableAdapter(Future future) { + assert future != null; + this.future = future; + } + + @Override + public boolean cancel() { + return FutureUtils.cancel(future); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } +} diff --git a/server/src/main/java/org/elasticsearch/threadpool/ScheduledCancellableAdapter.java b/server/src/main/java/org/elasticsearch/threadpool/ScheduledCancellableAdapter.java new file mode 100644 index 0000000000000..67448225bf3d4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/threadpool/ScheduledCancellableAdapter.java @@ -0,0 +1,56 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.util.concurrent.FutureUtils; + +import java.util.concurrent.Delayed; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +class ScheduledCancellableAdapter implements Scheduler.ScheduledCancellable { + private final ScheduledFuture scheduledFuture; + + ScheduledCancellableAdapter(ScheduledFuture scheduledFuture) { + assert scheduledFuture != null; + this.scheduledFuture = scheduledFuture; + } + + @Override + public long getDelay(TimeUnit unit) { + return scheduledFuture.getDelay(unit); + } + + @Override + public int compareTo(Delayed other) { + // unwrap other by calling on it. + return -other.compareTo(scheduledFuture); + } + + @Override + public boolean cancel() { + return FutureUtils.cancel(scheduledFuture); + } + + @Override + public boolean isCancelled() { + return scheduledFuture.isCancelled(); + } +} diff --git a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java index 1b7c74ed6eec4..4c1ad6a3715c6 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java +++ b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java @@ -19,6 +19,9 @@ package org.elasticsearch.threadpool; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -27,6 +30,8 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import java.util.concurrent.Delayed; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -39,6 +44,14 @@ */ public interface Scheduler { + /** + * Create a scheduler that can be used client side. Server side, please use ThreadPool.schedule instead. + * + * Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started + * using execute, submit and schedule. + * @param settings the settings to use + * @return executor + */ static ScheduledThreadPoolExecutor initScheduler(Settings settings) { final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); @@ -85,16 +98,16 @@ default Runnable preserveContext(Runnable command) { * The command runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow * to execute on a different executor, in which case blocking calls are allowed. * + * @param command the command to run * @param delay delay before the task executes * @param executor the name of the executor that has to execute this task. Ignored in the default implementation but can be used * by subclasses that support multiple executors. - * @param command the command to run * @return a ScheduledFuture who's get will return when the task has been added to its target thread pool and throws an exception if * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool * the ScheduledFuture cannot interact with it. * @throws EsRejectedExecutionException if the task cannot be scheduled for execution */ - ScheduledFuture schedule(TimeValue delay, String executor, Runnable command); + ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor); /** * Schedules a periodic action that runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow @@ -111,6 +124,25 @@ default Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, return new ReschedulingRunnable(command, interval, executor, this, (e) -> {}, (e) -> {}); } + /** + * Utility method to wrap a Future as a Cancellable + * @param future the future to wrap + * @return a cancellable delegating to the future + */ + static Cancellable wrapAsCancellable(Future future) { + return new CancellableAdapter(future); + } + + /** + * Utility method to wrap a ScheduledFuture as a ScheduledCancellable + * @param scheduledFuture the scheduled future to wrap + * @return a SchedulecCancellable delegating to the scheduledFuture + */ + static ScheduledCancellable wrapAsScheduledCancellable(ScheduledFuture scheduledFuture) { + return new ScheduledCancellableAdapter(scheduledFuture); + } + + /** * This interface represents an object whose execution may be cancelled during runtime. */ @@ -119,7 +151,7 @@ interface Cancellable { /** * Cancel the execution of this object. This method is idempotent. */ - void cancel(); + boolean cancel(); /** * Check if the execution has been cancelled @@ -128,6 +160,11 @@ interface Cancellable { boolean isCancelled(); } + /** + * A scheduled cancellable allow cancelling and reading the remaining delay of a scheduled task. + */ + interface ScheduledCancellable extends Delayed, Cancellable { } + /** * This class encapsulates the scheduling of a {@link Runnable} that needs to be repeated on a interval. For example, checking a value * for cleanup every second could be done by passing in a Runnable that can perform the check and the specified interval between @@ -165,12 +202,14 @@ final class ReschedulingRunnable extends AbstractRunnable implements Cancellable this.scheduler = scheduler; this.rejectionConsumer = rejectionConsumer; this.failureConsumer = failureConsumer; - scheduler.schedule(interval, executor, this); + scheduler.schedule(this, interval, executor); } @Override - public void cancel() { + public boolean cancel() { + final boolean result = run; run = false; + return result; } @Override @@ -202,7 +241,7 @@ public void onAfter() { // if this has not been cancelled reschedule it to run again if (run) { try { - scheduler.schedule(interval, executor, this); + scheduler.schedule(this, interval, executor); } catch (final EsRejectedExecutionException e) { onRejection(e); } @@ -211,9 +250,10 @@ public void onAfter() { } /** - * This subclass ensures to properly bubble up Throwable instances of type Error. + * This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks */ class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class); @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") public SafeScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { @@ -232,7 +272,12 @@ public SafeScheduledThreadPoolExecutor(int corePoolSize) { @Override protected void afterExecute(Runnable r, Throwable t) { - EsExecutors.rethrowErrors(r); + Throwable exception = EsExecutors.rethrowErrors(r); + if (exception != null) { + logger.warn(() -> + new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()), + exception); + } } } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index e4eaf20725b63..d42abf6b4e94b 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -55,7 +55,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -336,27 +335,27 @@ public ExecutorService executor(String name) { * context of the calling thread you may call threadPool.getThreadContext().preserveContext on the runnable before passing * it to this method. * + * @param command the command to run * @param delay delay before the task executes * @param executor the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes * the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the * command completes. - * @param command the command to run * @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool * the ScheduledFuture will cannot interact with it. * @throws org.elasticsearch.common.util.concurrent.EsRejectedExecutionException if the task cannot be scheduled for execution */ @Override - public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) { if (!Names.SAME.equals(executor)) { command = new ThreadedRunnable(command, executor(executor)); } - return scheduler.schedule(new ThreadPool.LoggingRunnable(command), delay.millis(), TimeUnit.MILLISECONDS); + return new ScheduledCancellableAdapter(scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); } public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) { try { - schedule(delay, executor, command); + schedule(command, delay, executor); } catch (EsRejectedExecutionException e) { if (e.isExecutorShutdown()) { logger.debug(new ParameterizedMessage("could not schedule execution of [{}] after [{}] on [{}] as executor is shut down", diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index e0b466e2ec631..2ff5ae1583e37 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -326,7 +326,7 @@ private List initiateConnection(DiscoveryNode node, ConnectionProfil } TimeValue connectTimeout = connectionProfile.getConnectTimeout(); - threadPool.schedule(connectTimeout, ThreadPool.Names.GENERIC, channelsConnectedListener::onTimeout); + threadPool.schedule(channelsConnectedListener::onTimeout, connectTimeout, ThreadPool.Names.GENERIC); return channels; } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index 3497b29d6d0d7..8db953e4e9598 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -73,8 +73,10 @@ void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeV final Version minCompatVersion = version.minimumCompatibilityVersion(); handshakeRequestSender.sendRequest(node, channel, requestId, minCompatVersion); - threadPool.schedule(timeout, ThreadPool.Names.GENERIC, - () -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]"))); + threadPool.schedule( + () -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]")), + timeout, + ThreadPool.Names.GENERIC); success = true; } catch (Exception e) { handler.handleLocalException(new ConnectTransportException(node, "failure to send " + HANDSHAKE_ACTION_NAME, e)); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java index 8f17377c2a29f..571ced1c118f9 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java @@ -157,7 +157,7 @@ private ScheduledPing(TimeValue pingInterval) { void ensureStarted() { if (isStarted.get() == false && isStarted.compareAndSet(false, true)) { - threadPool.schedule(pingInterval, ThreadPool.Names.GENERIC, this); + threadPool.schedule(this, pingInterval, ThreadPool.Names.GENERIC); } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 57aaba671518e..531354b068c03 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -42,14 +42,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -65,7 +66,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -622,7 +622,7 @@ private void sendRequestInternal(final Transport.C } if (timeoutHandler != null) { assert options.timeout() != null; - timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler); + timeoutHandler.scheduleTimeout(options.timeout()); } connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream } catch (final Exception e) { @@ -989,7 +989,7 @@ final class TimeoutHandler implements Runnable { private final long sentTime = threadPool.relativeTimeInMillis(); private final String action; private final DiscoveryNode node; - volatile ScheduledFuture future; + volatile Scheduler.Cancellable cancellable; TimeoutHandler(long requestId, DiscoveryNode node, String action) { this.requestId = requestId; @@ -1024,13 +1024,19 @@ public void run() { public void cancel() { assert responseHandlers.contains(requestId) == false : "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers"; - FutureUtils.cancel(future); + if (cancellable != null) { + cancellable.cancel(); + } } @Override public String toString() { return "timeout handler for [" + requestId + "][" + action + "]"; } + + private void scheduleTimeout(TimeValue timeout) { + this.cancellable = threadPool.schedule(this, timeout, ThreadPool.Names.GENERIC); + } } static class TimeoutInfoHolder { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index 6a7d9bc02ec3e..e2527397a780a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -96,7 +96,7 @@ public void testAwaitOnCloseCallsOnClose() throws Exception { BiConsumer> consumer = (request, listener) -> {}; BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(), 0, 10, new ByteSizeValue(1000), null, - (delay, executor, command) -> null, () -> called.set(true), BulkRequest::new); + (command, delay, executor) -> null, () -> called.set(true), BulkRequest::new); assertFalse(called.get()); bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS); diff --git a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java index db624798bb71c..a76fced9772f5 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java @@ -33,10 +33,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; @@ -149,9 +146,9 @@ public void testDelayAndRethrottle() throws IOException, InterruptedException { int batchSizeForMaxDelay = (int) (maxDelay.seconds() * originalRequestsPerSecond); ThreadPool threadPool = new TestThreadPool(getTestName()) { @Override - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { assertThat(delay.nanos(), both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(maxDelay.nanos()))); - return super.schedule(delay, name, command); + return super.schedule(command, delay, name); } }; try { @@ -202,8 +199,8 @@ public void onFailure(Exception e) { public void testDelayNeverNegative() throws IOException { // Thread pool that returns a ScheduledFuture that claims to have a negative delay ThreadPool threadPool = new TestThreadPool("test") { - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { - return new ScheduledFuture() { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { + return new ScheduledCancellable() { @Override public long getDelay(TimeUnit unit) { return -1; @@ -215,7 +212,7 @@ public int compareTo(Delayed o) { } @Override - public boolean cancel(boolean mayInterruptIfRunning) { + public boolean cancel() { throw new UnsupportedOperationException(); } @@ -223,21 +220,6 @@ public boolean cancel(boolean mayInterruptIfRunning) { public boolean isCancelled() { throw new UnsupportedOperationException(); } - - @Override - public boolean isDone() { - throw new UnsupportedOperationException(); - } - - @Override - public Void get() throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - @Override - public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - throw new UnsupportedOperationException(); - } }; } }; diff --git a/server/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java b/server/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java index b412aa5755d4f..5b60c82feb14f 100644 --- a/server/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java @@ -175,7 +175,8 @@ interface TriFunction { private static class MockCancellable implements Cancellable { @Override - public void cancel() { + public boolean cancel() { + return true; } @Override diff --git a/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java b/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java index f13a5f4e3bddb..785552124ea26 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java @@ -33,7 +33,6 @@ import org.junit.Before; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -83,7 +82,7 @@ public void testDoesNotRescheduleUntilExecutionFinished() throws Exception { ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool, (e) -> {}, (e) -> {}); // this call was made during construction of the runnable - verify(threadPool, times(1)).schedule(delay, Names.GENERIC, reschedulingRunnable); + verify(threadPool, times(1)).schedule(reschedulingRunnable, delay, Names.GENERIC); // create a thread and start the runnable Thread runThread = new Thread() { @@ -103,7 +102,7 @@ public void run() { runThread.join(); // validate schedule was called again - verify(threadPool, times(2)).schedule(delay, Names.GENERIC, reschedulingRunnable); + verify(threadPool, times(2)).schedule(reschedulingRunnable, delay, Names.GENERIC); } public void testThatRunnableIsRescheduled() throws Exception { @@ -251,7 +250,7 @@ public void testOnRejectionCausesCancellation() throws Exception { terminate(threadPool); threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "fixed delay tests").build()) { @Override - public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) { if (command instanceof ReschedulingRunnable) { ((ReschedulingRunnable) command).onRejection(new EsRejectedExecutionException()); } else { diff --git a/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java b/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java new file mode 100644 index 0000000000000..186f9e86b1e76 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java @@ -0,0 +1,156 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +public class SchedulerTests extends ESTestCase { + + public void testCancelOnThreadPool() { + ThreadPool threadPool = new TestThreadPool("test"); + AtomicLong executed = new AtomicLong(); + try { + ThreadPool.THREAD_POOL_TYPES.keySet().forEach(type -> + scheduleAndCancel(threadPool, executed, type)); + assertEquals(0, executed.get()); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + + private void scheduleAndCancel(ThreadPool threadPool, AtomicLong executed, String type) { + Scheduler.ScheduledCancellable scheduled = threadPool.schedule(executed::incrementAndGet, TimeValue.timeValueSeconds(20), type); + assertEquals(1, schedulerQueueSize(threadPool)); + assertFalse(scheduled.isCancelled()); + assertTrue(scheduled.cancel()); + assertTrue(scheduled.isCancelled()); + assertEquals("Cancel must auto-remove", 0, schedulerQueueSize(threadPool)); + } + + private int schedulerQueueSize(ThreadPool threadPool) { + return ((Scheduler.SafeScheduledThreadPoolExecutor) threadPool.scheduler()).getQueue().size(); + } + + public void testCancelOnScheduler() { + ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY); + Scheduler scheduler = (command, delay, name) -> + Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); + + AtomicLong executed = new AtomicLong(); + try { + Scheduler.ScheduledCancellable scheduled = + scheduler.schedule(executed::incrementAndGet, TimeValue.timeValueSeconds(20), ThreadPool.Names.SAME); + assertEquals(1, executor.getQueue().size()); + assertFalse(scheduled.isCancelled()); + assertTrue(scheduled.cancel()); + assertTrue(scheduled.isCancelled()); + assertEquals("Cancel must auto-remove", 0, executor.getQueue().size()); + assertEquals(0, executed.get()); + } finally { + Scheduler.terminate(executor, 10, TimeUnit.SECONDS); + } + } + + + public void testDelay() throws InterruptedException { + ThreadPool threadPool = new TestThreadPool("test"); + try { + List jobs = LongStream.range(20,30) + .mapToObj(delay -> threadPool.schedule(() -> {}, + TimeValue.timeValueSeconds(delay), + ThreadPool.Names.SAME)) + .collect(Collectors.toCollection(ArrayList::new)); + + Collections.reverse(jobs); + + List initialDelays = verifyJobDelays(jobs); + Thread.sleep(50); + List laterDelays = verifyJobDelays(jobs); + + assertThat(laterDelays, + Matchers.contains(initialDelays.stream().map(Matchers::lessThan).collect(Collectors.toList()))); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + + private List verifyJobDelays(List jobs) { + List delays = new ArrayList<>(jobs.size()); + Scheduler.ScheduledCancellable previous = null; + for (Scheduler.ScheduledCancellable job : jobs) { + if (previous != null) { + long previousDelay = previous.getDelay(TimeUnit.MILLISECONDS); + long delay = job.getDelay(TimeUnit.MILLISECONDS); + assertThat(delay, Matchers.lessThan(previousDelay)); + assertThat(job, Matchers.lessThan(previous)); + } + assertThat(job.getDelay(TimeUnit.SECONDS), Matchers.greaterThan(1L)); + assertThat(job.getDelay(TimeUnit.SECONDS), Matchers.lessThanOrEqualTo(30L)); + + delays.add(job.getDelay(TimeUnit.NANOSECONDS)); + previous = job; + } + + return delays; + } + + // simple test for successful scheduling, exceptions tested more thoroughly in EvilThreadPoolTests + public void testScheduledOnThreadPool() throws InterruptedException { + ThreadPool threadPool = new TestThreadPool("test"); + CountDownLatch missingExecutions = new CountDownLatch(ThreadPool.THREAD_POOL_TYPES.keySet().size()); + try { + ThreadPool.THREAD_POOL_TYPES.keySet() + .forEach(type -> + threadPool.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), type)); + + assertTrue(missingExecutions.await(30, TimeUnit.SECONDS)); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + + // simple test for successful scheduling, exceptions tested more thoroughly in EvilThreadPoolTests + public void testScheduledOnScheduler() throws InterruptedException { + ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY); + Scheduler scheduler = (command, delay, name) -> + Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); + + CountDownLatch missingExecutions = new CountDownLatch(1); + try { + scheduler.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), ThreadPool.Names.SAME); + assertTrue(missingExecutions.await(30, TimeUnit.SECONDS)); + } finally { + Scheduler.terminate(executor, 10, TimeUnit.SECONDS); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java b/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java index a56db579cec80..e400292873411 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java @@ -32,7 +32,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.Deque; -import java.util.concurrent.ScheduledFuture; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -212,7 +211,7 @@ private CapturingThreadPool() { } @Override - public ScheduledFuture schedule(TimeValue delay, String executor, Runnable task) { + public ScheduledCancellable schedule(Runnable task, TimeValue delay, String executor) { scheduledTasks.add(new Tuple<>(delay, task)); return null; } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index eb4e55853d5a1..f14afe3c36534 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -344,7 +344,7 @@ public ExecutorService executor(String name) { } @Override - public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) { final int NOT_STARTED = 0; final int STARTED = 1; final int CANCELLED = 2; @@ -364,7 +364,7 @@ public String toString() { } })); - return new ScheduledFuture() { + return new ScheduledCancellable() { @Override public long getDelay(TimeUnit unit) { throw new UnsupportedOperationException(); @@ -376,8 +376,7 @@ public int compareTo(Delayed o) { } @Override - public boolean cancel(boolean mayInterruptIfRunning) { - assert mayInterruptIfRunning == false; + public boolean cancel() { return taskState.compareAndSet(NOT_STARTED, CANCELLED); } @@ -386,20 +385,6 @@ public boolean isCancelled() { return taskState.get() == CANCELLED; } - @Override - public boolean isDone() { - throw new UnsupportedOperationException(); - } - - @Override - public Object get() { - throw new UnsupportedOperationException(); - } - - @Override - public Object get(long timeout, TimeUnit unit) { - throw new UnsupportedOperationException(); - } }; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 469a4cfb4d55e..e3d7e72a0bb97 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -368,7 +368,7 @@ protected void doRun() throws IOException { runnable.run(); } else { requestsToSendWhenCleared.add(runnable); - threadPool.schedule(delay, ThreadPool.Names.GENERIC, runnable); + threadPool.schedule(runnable, delay, ThreadPool.Names.GENERIC); } } } diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java index c13c840377f5e..129ef89e88c9b 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -322,14 +321,14 @@ public void testThreadPoolSchedulesFutureTasks() { final ThreadPool threadPool = taskQueue.getThreadPool(); final long delayMillis = randomLongBetween(1, 100); - threadPool.schedule(TimeValue.timeValueMillis(delayMillis), GENERIC, () -> strings.add("deferred")); + threadPool.schedule(() -> strings.add("deferred"), TimeValue.timeValueMillis(delayMillis), GENERIC); assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); - threadPool.schedule(TimeValue.ZERO, GENERIC, () -> strings.add("runnable")); + threadPool.schedule(() -> strings.add("runnable"), TimeValue.ZERO, GENERIC); assertTrue(taskQueue.hasRunnableTasks()); - threadPool.schedule(TimeValue.MINUS_ONE, GENERIC, () -> strings.add("also runnable")); + threadPool.schedule(() -> strings.add("also runnable"), TimeValue.MINUS_ONE, GENERIC); taskQueue.runAllTasks(); @@ -339,8 +338,8 @@ public void testThreadPoolSchedulesFutureTasks() { final long delayMillis1 = randomLongBetween(2, 100); final long delayMillis2 = randomLongBetween(1, delayMillis1 - 1); - threadPool.schedule(TimeValue.timeValueMillis(delayMillis1), GENERIC, () -> strings.add("further deferred")); - threadPool.schedule(TimeValue.timeValueMillis(delayMillis2), GENERIC, () -> strings.add("not quite so deferred")); + threadPool.schedule(() -> strings.add("further deferred"), TimeValue.timeValueMillis(delayMillis1), GENERIC); + threadPool.schedule(() -> strings.add("not quite so deferred"), TimeValue.timeValueMillis(delayMillis2), GENERIC); assertFalse(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasDeferredTasks()); @@ -349,9 +348,10 @@ public void testThreadPoolSchedulesFutureTasks() { assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1)); final TimeValue cancelledDelay = TimeValue.timeValueMillis(randomLongBetween(1, 100)); - final ScheduledFuture future = threadPool.schedule(cancelledDelay, "", () -> strings.add("cancelled before execution")); + final Scheduler.Cancellable cancelledBeforeExecution = + threadPool.schedule(() -> strings.add("cancelled before execution"), cancelledDelay, ""); - future.cancel(false); + cancelledBeforeExecution.cancel(); taskQueue.runAllTasks(); assertThat(strings, containsInAnyOrder("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred")); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 40aa90dcab5e0..56538d395feda 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -300,7 +300,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) { logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", shardFollowNodeTask), e); - threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state)); + threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME); } else { shardFollowNodeTask.markAsFailed(e); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 4af9e7c23a276..629127c454cef 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -98,7 +98,7 @@ private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testR BiConsumer scheduler = (delay, task) -> { assert delay.millis() < 100 : "The delay should be kept to a minimum, so that this test does not take to long to run"; if (stopped.get() == false) { - threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); + threadPool.schedule(task, delay, ThreadPool.Names.GENERIC); } }; List receivedOperations = Collections.synchronizedList(new ArrayList<>()); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 8a3e374a24c5a..8b05b618ba407 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -380,7 +380,7 @@ private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ); final String recordedLeaderIndexHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); - BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); + BiConsumer scheduler = (delay, task) -> threadPool.schedule(task, delay, ThreadPool.Names.GENERIC); AtomicBoolean stopped = new AtomicBoolean(false); LongSet fetchOperations = new LongHashSet(); return new ShardFollowNodeTask( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 5b9852ba4fddc..2f7f9241038a4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -13,7 +13,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; @@ -21,7 +21,6 @@ import java.time.ZonedDateTime; import java.util.Objects; import java.util.Random; -import java.util.concurrent.ScheduledFuture; import java.util.function.Supplier; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -45,7 +44,7 @@ public class MlDailyMaintenanceService implements Releasable { */ private final Supplier schedulerProvider; - private volatile ScheduledFuture future; + private volatile Scheduler.Cancellable cancellable; MlDailyMaintenanceService(ThreadPool threadPool, Client client, Supplier scheduleProvider) { this.threadPool = Objects.requireNonNull(threadPool); @@ -87,13 +86,13 @@ public void start() { public void stop() { LOGGER.debug("Stopping ML daily maintenance service"); - if (future != null && future.isCancelled() == false) { - FutureUtils.cancel(future); + if (cancellable != null && cancellable.isCancelled() == false) { + cancellable.cancel(); } } public boolean isStarted() { - return future != null; + return cancellable != null; } @Override @@ -103,7 +102,7 @@ public void close() { private void scheduleNext() { try { - future = threadPool.schedule(schedulerProvider.get(), ThreadPool.Names.GENERIC, this::triggerTasks); + cancellable = threadPool.schedule(this::triggerTasks, schedulerProvider.get(), ThreadPool.Names.GENERIC); } catch (EsRejectedExecutionException e) { if (e.isExecutorShutdown()) { LOGGER.debug("failed to schedule next maintenance task; shutting down", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index e004a718b13fa..409d15182d96a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -16,11 +16,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; @@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -153,7 +152,8 @@ public void isolateDatafeed(long allocationId) { // otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel // the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running. private void innerRun(Holder holder, long startTime, Long endTime) { - holder.future = threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() { + holder.cancellable = + Scheduler.wrapAsCancellable(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -204,14 +204,14 @@ protected void doRun() { } } } - }); + })); } void doDatafeedRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) { if (holder.isRunning() && !holder.isIsolated()) { TimeValue delay = computeNextDelay(delayInMsSinceEpoch); logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId); - holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_THREAD_POOL_NAME, new AbstractRunnable() { + holder.cancellable = threadPool.schedule(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -248,7 +248,7 @@ protected void doRun() { doDatafeedRealtime(nextDelayInMsSinceEpoch, jobId, holder); } } - }); + }, delay, MachineLearning.DATAFEED_THREAD_POOL_NAME); } } @@ -297,7 +297,7 @@ public class Holder { private final boolean autoCloseJob; private final ProblemTracker problemTracker; private final Consumer finishHandler; - volatile Future future; + volatile Scheduler.Cancellable cancellable; private volatile boolean isRelocating; Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob, @@ -341,7 +341,9 @@ public void stop(String source, TimeValue timeout, Exception e) { logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeedId, datafeedJob.getJobId(), acquired); runningDatafeedsOnThisNode.remove(allocationId); - FutureUtils.cancel(future); + if (cancellable != null) { + cancellable.cancel(); + } auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); finishHandler.accept(e); logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 7aa2d93f1201f..a71e06bddef67 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -10,11 +10,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledFuture; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.hamcrest.Matchers.is; @@ -46,8 +46,8 @@ public void setUpMocks() { }).when(executorService).execute(any(Runnable.class)); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); - ScheduledFuture scheduledFuture = mock(ScheduledFuture.class); - when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture); + Scheduler.ScheduledCancellable scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 858f7e0f7d1c9..ff4160ffe4d19 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -48,7 +49,7 @@ import java.util.Collections; import java.util.Date; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -114,7 +115,7 @@ public void setUpTests() { ExecutorService executorService = mock(ExecutorService.class); doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); - return null; + return mock(Future.class); }).when(executorService).submit(any(Runnable.class)); when(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); @@ -178,11 +179,11 @@ public void testStart_emptyDataCountException() throws Exception { int[] counter = new int[] {0}; doAnswer(invocationOnMock -> { if (counter[0]++ < 10) { - Runnable r = (Runnable) invocationOnMock.getArguments()[2]; + Runnable r = (Runnable) invocationOnMock.getArguments()[0]; currentTime += 600000; r.run(); } - return mock(ScheduledFuture.class); + return mock(Scheduler.ScheduledCancellable.class); }).when(threadPool).schedule(any(), any(), any()); when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.EmptyDataCountException(0L)); @@ -192,7 +193,7 @@ public void testStart_emptyDataCountException() throws Exception { DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null); datafeedManager.run(task, handler); - verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); + verify(threadPool, times(11)).schedule(any(), any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME)); verify(auditor, times(1)).warning(eq("job_id"), anyString()); } @@ -248,7 +249,7 @@ public void testStart_GivenNewlyCreatedJobLookBackAndRealtime() throws Exception verify(handler).accept(null); assertThat(datafeedManager.isRunning(task.getAllocationId()), is(false)); } else { - verify(threadPool, times(1)).schedule(eq(new TimeValue(1)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); + verify(threadPool, times(1)).schedule(any(), eq(new TimeValue(1)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME)); assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 8c0f5da6366a0..86f408a42d43b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -457,7 +457,7 @@ public void testKill() throws TimeoutException { } private void setupScheduleDelayTime(TimeValue delay) { - when(threadPool.schedule(any(TimeValue.class), anyString(), any(Runnable.class))) - .thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[2], delay.nanos(), TimeUnit.NANOSECONDS)); + when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString())) + .thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[0], delay.nanos(), TimeUnit.NANOSECONDS)); } } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/cleaner/CleanerService.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/cleaner/CleanerService.java index e6af45ee2bca0..571bfc7c1a964 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/cleaner/CleanerService.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/cleaner/CleanerService.java @@ -13,8 +13,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.monitoring.MonitoringField; import org.joda.time.DateTime; @@ -22,7 +22,6 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ScheduledFuture; /** * {@code CleanerService} takes care of deleting old monitoring indices. @@ -57,7 +56,7 @@ public CleanerService(Settings settings, ClusterSettings clusterSettings, Thread @Override protected void doStart() { logger.debug("starting cleaning service"); - threadPool.schedule(executionScheduler.nextExecutionDelay(new DateTime(ISOChronology.getInstance())), executorName(), runnable); + threadPool.schedule(runnable, executionScheduler.nextExecutionDelay(new DateTime(ISOChronology.getInstance())), executorName()); logger.debug("cleaning service started"); } @@ -153,7 +152,7 @@ public interface Listener { */ class IndicesCleaner extends AbstractLifecycleRunnable { - private volatile ScheduledFuture future; + private volatile Scheduler.Cancellable cancellable; /** * Enable automatic logging and stopping of the runnable based on the {@link #lifecycle}. @@ -197,7 +196,7 @@ protected void onAfterInLifecycle() { logger.debug("scheduling next execution in [{}] seconds", delay.seconds()); try { - future = threadPool.schedule(delay, executorName(), this); + cancellable = threadPool.schedule(this, delay, executorName()); } catch (EsRejectedExecutionException e) { if (e.isExecutorShutdown()) { logger.debug("couldn't schedule new execution of the cleaner, executor is shutting down", e); @@ -215,13 +214,13 @@ public void onFailure(Exception e) { /** * Cancel/stop the cleaning service. *

- * This will kill any scheduled {@link #future} from running. It's possible that this will be executed concurrently with the + * This will kill any scheduled {@link #cancellable} from running. It's possible that this will be executed concurrently with the * {@link #onAfter() rescheduling code}, at which point it will be stopped during the next execution if the service is * stopped. */ public void cancel() { - if (future != null && future.isCancelled() == false) { - FutureUtils.cancel(future); + if (cancellable != null && cancellable.isCancelled() == false) { + cancellable.cancel(); } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java index d9245e4e22813..33ecf0c233a83 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java @@ -132,7 +132,7 @@ protected void doAuthenticate(UsernamePasswordToken token, ActionListener userActionList () -> sessionFactory.unauthenticatedSession(username, contextPreservingListener(new LdapSessionActionListener("lookup", username, sessionListener))), logger); threadPool.generic().execute(cancellableLdapRunnable); - threadPool.schedule(executionTimeout, Names.SAME, cancellableLdapRunnable::maybeTimeout); + threadPool.schedule(cancellableLdapRunnable::maybeTimeout, executionTimeout, Names.SAME); } else { userActionListener.onResponse(null); }