Skip to content

Commit

Permalink
Fix job scheduling for same scheduled time (elastic#64501)
Browse files Browse the repository at this point in the history
The SchedulerEngine used by SLM uses a custom runnable that will
schedule itself for its next execution if there is one to run. For the
majority of jobs, this scheduling could be many hours or days away. Due
to the scheduling so far in advance, there is a chance that time drifts
on the machine or even that time varies core to core so there is no
guarantee that the job actually runs on or after the scheduled time.
This can cause some jobs to reschedule themselves for the same
scheduled time even if they ran only a millisecond prior to the
scheduled time, which causes unexpected actions to be taken such as
what appears as duplicated snapshots.

This change resolves this by checking the triggered time against the
scheduled time and using the appropriate value to ensure that we do
not have unexpected job runs.

Relates elastic#63754
  • Loading branch information
jaymode authored Nov 4, 2020
1 parent 281ae6c commit d451bd1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,19 @@ protected void notifyListeners(final String name, final long triggeredTime, fina
}
}

// for testing
ActiveSchedule getSchedule(String jobId) {
return schedules.get(jobId);
}

class ActiveSchedule implements Runnable {

private final String name;
private final Schedule schedule;
private final long startTime;

private ScheduledFuture<?> future;
private long scheduledTime;
private long scheduledTime = -1;

ActiveSchedule(String name, Schedule schedule, long startTime) {
this.name = name;
Expand Down Expand Up @@ -223,10 +228,10 @@ public void run() {
scheduleNextRun(triggeredTime);
}

private void scheduleNextRun(long currentTime) {
this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, currentTime);
private void scheduleNextRun(long triggeredTime) {
this.scheduledTime = computeNextScheduledTime(triggeredTime);
if (scheduledTime != -1) {
long delay = Math.max(0, scheduledTime - currentTime);
long delay = Math.max(0, scheduledTime - clock.millis());
try {
synchronized (this) {
if (future == null || future.isCancelled() == false) {
Expand All @@ -242,6 +247,28 @@ private void scheduleNextRun(long currentTime) {
}
}

// for testing
long getScheduledTime() {
return scheduledTime;
}

long computeNextScheduledTime(long triggeredTime) {
// multiple time sources + multiple cpus + ntp + VMs means you can't trust time ever!
// scheduling happens far enough in advance in most cases that time can drift and we
// may execute at some point before the scheduled time. There can also be time differences
// between the CPU cores and/or the clock used by the threadpool and that used by this class
// for scheduling. Regardless, we shouldn't reschedule to execute again until after the
// scheduled time.
final long scheduleAfterTime;
if (scheduledTime != -1 && triggeredTime < scheduledTime) {
scheduleAfterTime = scheduledTime;
} else {
scheduleAfterTime = triggeredTime;
}

return schedule.nextScheduledTimeAfter(startTime, scheduleAfterTime);
}

public synchronized void cancel() {
FutureUtils.cancel(future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.ActiveSchedule;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Job;
import org.mockito.ArgumentCaptor;

import java.time.Clock;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -184,6 +186,27 @@ public void testCancellingDuringRunPreventsRescheduling() throws Exception {
}
}

public void testNextScheduledTimeAfterCurrentScheduledTime() throws Exception {
final Clock clock = Clock.fixed(Clock.systemUTC().instant(), ZoneId.of("UTC"));
final long oneHourMillis = TimeUnit.HOURS.toMillis(1L);
final String jobId = randomAlphaOfLength(4);
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, clock);
try {
engine.add(new Job(jobId, ((startTime, now) -> now + oneHourMillis)));

ActiveSchedule activeSchedule = engine.getSchedule(jobId);
assertNotNull(activeSchedule);
assertEquals(clock.millis() + oneHourMillis, activeSchedule.getScheduledTime());

assertEquals(clock.millis() + oneHourMillis + oneHourMillis,
activeSchedule.computeNextScheduledTime(clock.millis() - randomIntBetween(1, 999)));
assertEquals(clock.millis() + oneHourMillis + oneHourMillis,
activeSchedule.computeNextScheduledTime(clock.millis() + TimeUnit.SECONDS.toMillis(10L)));
} finally {
engine.stop();
}
}

private void assertFailedListenerLogMessage(Logger mockLogger, int times) {
final ArgumentCaptor<ParameterizedMessage> messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class);
final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
Expand Down

0 comments on commit d451bd1

Please sign in to comment.