From 879dd5a99c7a257a5a7eb8409a879985c8b9985a Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Thu, 15 Oct 2020 13:14:26 -0600 Subject: [PATCH] Ensure cancelled jobs do not continue to run (#63762) This commit ensures that jobs within the SchedulerEngine do not continue to run after they are cancelled. There was no synchronization between the cancel method of an ActiveSchedule and the run method, so an actively running schedule would go ahead and reschedule itself even if the cancel method had been called. This commit adds synchronization between cancelling and the scheduling of the next run to ensure that the job is cancelled. In real life scenarios this could manifest as a job running multiple times for SLM. This could happen if a job had been triggered and was cancelled prior to completing its run such as if the node was no longer the master node or if SLM was stopping/stopped. Closes #63754 --- .../xpack/core/scheduler/SchedulerEngine.java | 12 ++++--- .../core/scheduler/SchedulerEngineTests.java | 34 +++++++++++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index 6c4ccdbd43ee5..06b03636d05d6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -194,8 +194,8 @@ class ActiveSchedule implements Runnable { private final Schedule schedule; private final long startTime; - private volatile ScheduledFuture future; - private volatile long scheduledTime; + private ScheduledFuture future; + private long scheduledTime; ActiveSchedule(String name, Schedule schedule, long startTime) { this.name = name; @@ -228,7 +228,11 @@ private void scheduleNextRun(long currentTime) { if (scheduledTime != -1) { long delay = Math.max(0, scheduledTime - currentTime); try { - future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + synchronized (this) { + if (future == null || future.isCancelled() == false) { + future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); + } + } } catch (RejectedExecutionException e) { // ignoring rejections if the scheduler has been shut down already if (scheduler.isShutdown() == false) { @@ -238,7 +242,7 @@ private void scheduleNextRun(long currentTime) { } } - public void cancel() { + public synchronized void cancel() { FutureUtils.cancel(future); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java index 5ab7b805cc1f7..63814773d9ccd 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Job; import org.mockito.ArgumentCaptor; import java.time.Clock; @@ -18,6 +19,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -25,6 +27,7 @@ import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -150,6 +153,37 @@ public void testListenersThrowingExceptionsDoNotCauseNextScheduledTaskToBeSkippe } } + public void testCancellingDuringRunPreventsRescheduling() throws Exception { + final CountDownLatch jobRunningLatch = new CountDownLatch(1); + final CountDownLatch listenerLatch = new CountDownLatch(1); + final AtomicInteger calledCount = new AtomicInteger(0); + final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC()); + final String jobId = randomAlphaOfLength(4); + try { + engine.register(event -> { + assertThat(event.getJobName(), is(jobId)); + calledCount.incrementAndGet(); + jobRunningLatch.countDown(); + try { + listenerLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + engine.add(new Job(jobId, ((startTime, now) -> 0))); + + jobRunningLatch.await(); + final int called = calledCount.get(); + assertEquals(1, called); + engine.remove(jobId); + listenerLatch.countDown(); + + assertBusy(() -> assertEquals(called, calledCount.get()), 5, TimeUnit.MILLISECONDS); + } finally { + engine.stop(); + } + } + private void assertFailedListenerLogMessage(Logger mockLogger, int times) { final ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class); final ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class);