Skip to content

Commit

Permalink
SimpleScheduler - fix CronTrigger#evaluate()
Browse files Browse the repository at this point in the history
- resolves quarkusio#17724

(cherry picked from commit ab144e2)
  • Loading branch information
mkouba authored and gsmet committed Jun 10, 2021
1 parent ce0c1e6 commit d65edc8
Showing 1 changed file with 25 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@Singleton
public class SimpleScheduler implements Scheduler {

private static final Logger LOGGER = Logger.getLogger(SimpleScheduler.class);
private static final Logger LOG = Logger.getLogger(SimpleScheduler.class);

// milliseconds
private static final long CHECK_PERIOD = 1000L;
Expand All @@ -65,10 +65,10 @@ public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedule

if (!schedulerRuntimeConfig.enabled) {
this.scheduledExecutor = null;
LOGGER.info("Simple scheduler is disabled by config property and will not be started");
LOG.info("Simple scheduler is disabled by config property and will not be started");
} else if (context.getScheduledMethods().isEmpty()) {
this.scheduledExecutor = null;
LOGGER.info("No scheduled business methods found - Simple scheduler will not be started");
LOG.info("No scheduled business methods found - Simple scheduler will not be started");
} else {
this.scheduledExecutor = new JBossScheduledThreadPoolExecutor(1, new Runnable() {
@Override
Expand Down Expand Up @@ -118,16 +118,17 @@ void stop() {
scheduledExecutor.shutdownNow();
}
} catch (Exception e) {
LOGGER.warn("Unable to shutdown the scheduler executor", e);
LOG.warn("Unable to shutdown the scheduler executor", e);
}
}

void checkTriggers() {
if (!running) {
LOGGER.trace("Skip all triggers - scheduler paused");
LOG.trace("Skip all triggers - scheduler paused");
return;
}
ZonedDateTime now = ZonedDateTime.now();
LOG.tracef("Check triggers at %s", now);
for (ScheduledTask task : scheduledTasks) {
task.execute(now, executor);
}
Expand All @@ -136,7 +137,7 @@ void checkTriggers() {
@Override
public void pause() {
if (!enabled) {
LOGGER.warn("Scheduler is disabled and cannot be paused");
LOG.warn("Scheduler is disabled and cannot be paused");
} else {
running = false;
}
Expand All @@ -146,7 +147,7 @@ public void pause() {
public void pause(String identity) {
Objects.requireNonNull(identity, "Cannot pause - identity is null");
if (identity.isEmpty()) {
LOGGER.warn("Cannot pause - identity is empty");
LOG.warn("Cannot pause - identity is empty");
return;
}
String parsedIdentity = SchedulerUtils.lookUpPropertyValue(identity);
Expand All @@ -161,7 +162,7 @@ public void pause(String identity) {
@Override
public void resume() {
if (!enabled) {
LOGGER.warn("Scheduler is disabled and cannot be resumed");
LOG.warn("Scheduler is disabled and cannot be resumed");
} else {
running = true;
}
Expand All @@ -171,7 +172,7 @@ public void resume() {
public void resume(String identity) {
Objects.requireNonNull(identity, "Cannot resume - identity is null");
if (identity.isEmpty()) {
LOGGER.warn("Cannot resume - identity is empty");
LOG.warn("Cannot resume - identity is empty");
return;
}
String parsedIdentity = SchedulerUtils.lookUpPropertyValue(identity);
Expand Down Expand Up @@ -250,13 +251,12 @@ public void run() {
try {
invoker.invoke(new SimpleScheduledExecution(now, scheduledFireTime, trigger));
} catch (Throwable t) {
LOGGER.errorf(t, "Error occured while executing task for trigger %s", trigger);
LOG.errorf(t, "Error occured while executing task for trigger %s", trigger);
}
}
});
LOGGER.debugf("Executing scheduled task for trigger %s", trigger);
} catch (RejectedExecutionException e) {
LOGGER.warnf("Rejected execution of a scheduled task for trigger %s", trigger);
LOG.warnf("Rejected execution of a scheduled task for trigger %s", trigger);
}
}
}
Expand All @@ -268,6 +268,7 @@ static abstract class SimpleTrigger implements Trigger {
private final String id;
private volatile boolean running;
protected final ZonedDateTime start;
protected volatile ZonedDateTime lastFireTime;

public SimpleTrigger(String id, ZonedDateTime start) {
this.id = id;
Expand Down Expand Up @@ -298,8 +299,8 @@ public synchronized void setRunning(boolean running) {

static class IntervalTrigger extends SimpleTrigger {

// milliseconds
private final long interval;
private volatile ZonedDateTime lastFireTime;

public IntervalTrigger(String id, ZonedDateTime start, long interval) {
super(id, start);
Expand All @@ -316,9 +317,11 @@ ZonedDateTime evaluate(ZonedDateTime now) {
lastFireTime = now.truncatedTo(ChronoUnit.SECONDS);
return now;
}
if (ChronoUnit.MILLIS.between(lastFireTime, now) >= interval) {
long diff = ChronoUnit.MILLIS.between(lastFireTime, now);
if (diff >= interval) {
ZonedDateTime scheduledFireTime = lastFireTime.plus(Duration.ofMillis(interval));
lastFireTime = now.truncatedTo(ChronoUnit.SECONDS);
LOG.tracef("%s fired, diff=%s ms", this, diff);
return scheduledFireTime;
}
return null;
Expand All @@ -345,16 +348,14 @@ public String toString() {

static class CronTrigger extends SimpleTrigger {

// microseconds
private static final long DIFF_THRESHOLD = CHECK_PERIOD * 1000;

private final Cron cron;
private final ExecutionTime executionTime;

public CronTrigger(String id, ZonedDateTime start, Cron cron) {
super(id, start);
this.cron = cron;
this.executionTime = ExecutionTime.forCron(cron);
this.lastFireTime = ZonedDateTime.now();
}

@Override
Expand All @@ -373,17 +374,13 @@ ZonedDateTime evaluate(ZonedDateTime now) {
if (now.isBefore(start)) {
return null;
}
Optional<ZonedDateTime> lastFireTime = executionTime.lastExecution(now);
if (lastFireTime.isPresent()) {
ZonedDateTime trunc = lastFireTime.get().truncatedTo(ChronoUnit.SECONDS);
if (now.isBefore(trunc)) {
return null;
}
// Use microseconds precision to workaround incompatibility between jdk8 and jdk9+
long diff = ChronoUnit.MICROS.between(trunc, now);
if (diff <= DIFF_THRESHOLD) {
LOGGER.debugf("%s fired, diff=%s μs", this, diff);
return trunc;
Optional<ZonedDateTime> lastExecution = executionTime.lastExecution(now);
if (lastExecution.isPresent()) {
ZonedDateTime lastTruncated = lastExecution.get().truncatedTo(ChronoUnit.SECONDS);
if (now.isAfter(lastTruncated) && lastFireTime.isBefore(lastTruncated)) {
LOG.tracef("%s fired, last=", this, lastTruncated);
lastFireTime = now;
return lastTruncated;
}
}
return null;
Expand Down

0 comments on commit d65edc8

Please sign in to comment.