Skip to content

Commit

Permalink
Ensure cancelled jobs do not continue to run (elastic#63762)
Browse files Browse the repository at this point in the history
This commit ensures that jobs within the SchedulerEngine do not
continue to run after they are cancelled. There was no synchronization
between the cancel method of an ActiveSchedule and the run method, so
an actively running schedule would go ahead and reschedule itself even
if the cancel method had been called.

This commit adds synchronization between cancelling and the scheduling
of the next run to ensure that the job is cancelled. In real life
scenarios this could manifest as a job running multiple times for
SLM. This could happen if a job had been triggered and was cancelled
prior to completing its run such as if the node was no longer the
master node or if SLM was stopping/stopped.

Closes elastic#63754
  • Loading branch information
jaymode authored Oct 15, 2020
1 parent d126afb commit 879dd5a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ class ActiveSchedule implements Runnable {
private final Schedule schedule;
private final long startTime;

private volatile ScheduledFuture<?> future;
private volatile long scheduledTime;
private ScheduledFuture<?> future;
private long scheduledTime;

ActiveSchedule(String name, Schedule schedule, long startTime) {
this.name = name;
Expand Down Expand Up @@ -228,7 +228,11 @@ private void scheduleNextRun(long currentTime) {
if (scheduledTime != -1) {
long delay = Math.max(0, scheduledTime - currentTime);
try {
future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
synchronized (this) {
if (future == null || future.isCancelled() == false) {
future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
}
}
} catch (RejectedExecutionException e) {
// ignoring rejections if the scheduler has been shut down already
if (scheduler.isShutdown() == false) {
Expand All @@ -238,7 +242,7 @@ private void scheduleNextRun(long currentTime) {
}
}

public void cancel() {
public synchronized void cancel() {
FutureUtils.cancel(future);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,23 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Job;
import org.mockito.ArgumentCaptor;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -150,6 +153,37 @@ public void testListenersThrowingExceptionsDoNotCauseNextScheduledTaskToBeSkippe
}
}

public void testCancellingDuringRunPreventsRescheduling() throws Exception {
final CountDownLatch jobRunningLatch = new CountDownLatch(1);
final CountDownLatch listenerLatch = new CountDownLatch(1);
final AtomicInteger calledCount = new AtomicInteger(0);
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC());
final String jobId = randomAlphaOfLength(4);
try {
engine.register(event -> {
assertThat(event.getJobName(), is(jobId));
calledCount.incrementAndGet();
jobRunningLatch.countDown();
try {
listenerLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
engine.add(new Job(jobId, ((startTime, now) -> 0)));

jobRunningLatch.await();
final int called = calledCount.get();
assertEquals(1, called);
engine.remove(jobId);
listenerLatch.countDown();

assertBusy(() -> assertEquals(called, calledCount.get()), 5, TimeUnit.MILLISECONDS);
} finally {
engine.stop();
}
}

private void assertFailedListenerLogMessage(Logger mockLogger, int times) {
final ArgumentCaptor<ParameterizedMessage> messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class);
final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
Expand Down

0 comments on commit 879dd5a

Please sign in to comment.