From d65edc843373885b353236e80b698d05d853e549 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Tue, 8 Jun 2021 11:55:10 +0200 Subject: [PATCH] SimpleScheduler - fix CronTrigger#evaluate() - resolves #17724 (cherry picked from commit ab144e268aa246a11f192a6fa56d053111e81ddb) --- .../scheduler/runtime/SimpleScheduler.java | 53 +++++++++---------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index 0d3e85c1db933..b82838afa8878 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -45,7 +45,7 @@ @Singleton public class SimpleScheduler implements Scheduler { - private static final Logger LOGGER = Logger.getLogger(SimpleScheduler.class); + private static final Logger LOG = Logger.getLogger(SimpleScheduler.class); // milliseconds private static final long CHECK_PERIOD = 1000L; @@ -65,10 +65,10 @@ public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedule if (!schedulerRuntimeConfig.enabled) { this.scheduledExecutor = null; - LOGGER.info("Simple scheduler is disabled by config property and will not be started"); + LOG.info("Simple scheduler is disabled by config property and will not be started"); } else if (context.getScheduledMethods().isEmpty()) { this.scheduledExecutor = null; - LOGGER.info("No scheduled business methods found - Simple scheduler will not be started"); + LOG.info("No scheduled business methods found - Simple scheduler will not be started"); } else { this.scheduledExecutor = new JBossScheduledThreadPoolExecutor(1, new Runnable() { @Override @@ -118,16 +118,17 @@ void stop() { scheduledExecutor.shutdownNow(); } } catch (Exception e) { - LOGGER.warn("Unable to shutdown the scheduler executor", e); + LOG.warn("Unable to shutdown the scheduler executor", e); } } void checkTriggers() { if (!running) { - LOGGER.trace("Skip all triggers - scheduler paused"); + LOG.trace("Skip all triggers - scheduler paused"); return; } ZonedDateTime now = ZonedDateTime.now(); + LOG.tracef("Check triggers at %s", now); for (ScheduledTask task : scheduledTasks) { task.execute(now, executor); } @@ -136,7 +137,7 @@ void checkTriggers() { @Override public void pause() { if (!enabled) { - LOGGER.warn("Scheduler is disabled and cannot be paused"); + LOG.warn("Scheduler is disabled and cannot be paused"); } else { running = false; } @@ -146,7 +147,7 @@ public void pause() { public void pause(String identity) { Objects.requireNonNull(identity, "Cannot pause - identity is null"); if (identity.isEmpty()) { - LOGGER.warn("Cannot pause - identity is empty"); + LOG.warn("Cannot pause - identity is empty"); return; } String parsedIdentity = SchedulerUtils.lookUpPropertyValue(identity); @@ -161,7 +162,7 @@ public void pause(String identity) { @Override public void resume() { if (!enabled) { - LOGGER.warn("Scheduler is disabled and cannot be resumed"); + LOG.warn("Scheduler is disabled and cannot be resumed"); } else { running = true; } @@ -171,7 +172,7 @@ public void resume() { public void resume(String identity) { Objects.requireNonNull(identity, "Cannot resume - identity is null"); if (identity.isEmpty()) { - LOGGER.warn("Cannot resume - identity is empty"); + LOG.warn("Cannot resume - identity is empty"); return; } String parsedIdentity = SchedulerUtils.lookUpPropertyValue(identity); @@ -250,13 +251,12 @@ public void run() { try { invoker.invoke(new SimpleScheduledExecution(now, scheduledFireTime, trigger)); } catch (Throwable t) { - LOGGER.errorf(t, "Error occured while executing task for trigger %s", trigger); + LOG.errorf(t, "Error occured while executing task for trigger %s", trigger); } } }); - LOGGER.debugf("Executing scheduled task for trigger %s", trigger); } catch (RejectedExecutionException e) { - LOGGER.warnf("Rejected execution of a scheduled task for trigger %s", trigger); + LOG.warnf("Rejected execution of a scheduled task for trigger %s", trigger); } } } @@ -268,6 +268,7 @@ static abstract class SimpleTrigger implements Trigger { private final String id; private volatile boolean running; protected final ZonedDateTime start; + protected volatile ZonedDateTime lastFireTime; public SimpleTrigger(String id, ZonedDateTime start) { this.id = id; @@ -298,8 +299,8 @@ public synchronized void setRunning(boolean running) { static class IntervalTrigger extends SimpleTrigger { + // milliseconds private final long interval; - private volatile ZonedDateTime lastFireTime; public IntervalTrigger(String id, ZonedDateTime start, long interval) { super(id, start); @@ -316,9 +317,11 @@ ZonedDateTime evaluate(ZonedDateTime now) { lastFireTime = now.truncatedTo(ChronoUnit.SECONDS); return now; } - if (ChronoUnit.MILLIS.between(lastFireTime, now) >= interval) { + long diff = ChronoUnit.MILLIS.between(lastFireTime, now); + if (diff >= interval) { ZonedDateTime scheduledFireTime = lastFireTime.plus(Duration.ofMillis(interval)); lastFireTime = now.truncatedTo(ChronoUnit.SECONDS); + LOG.tracef("%s fired, diff=%s ms", this, diff); return scheduledFireTime; } return null; @@ -345,9 +348,6 @@ public String toString() { static class CronTrigger extends SimpleTrigger { - // microseconds - private static final long DIFF_THRESHOLD = CHECK_PERIOD * 1000; - private final Cron cron; private final ExecutionTime executionTime; @@ -355,6 +355,7 @@ public CronTrigger(String id, ZonedDateTime start, Cron cron) { super(id, start); this.cron = cron; this.executionTime = ExecutionTime.forCron(cron); + this.lastFireTime = ZonedDateTime.now(); } @Override @@ -373,17 +374,13 @@ ZonedDateTime evaluate(ZonedDateTime now) { if (now.isBefore(start)) { return null; } - Optional lastFireTime = executionTime.lastExecution(now); - if (lastFireTime.isPresent()) { - ZonedDateTime trunc = lastFireTime.get().truncatedTo(ChronoUnit.SECONDS); - if (now.isBefore(trunc)) { - return null; - } - // Use microseconds precision to workaround incompatibility between jdk8 and jdk9+ - long diff = ChronoUnit.MICROS.between(trunc, now); - if (diff <= DIFF_THRESHOLD) { - LOGGER.debugf("%s fired, diff=%s μs", this, diff); - return trunc; + Optional lastExecution = executionTime.lastExecution(now); + if (lastExecution.isPresent()) { + ZonedDateTime lastTruncated = lastExecution.get().truncatedTo(ChronoUnit.SECONDS); + if (now.isAfter(lastTruncated) && lastFireTime.isBefore(lastTruncated)) { + LOG.tracef("%s fired, last=", this, lastTruncated); + lastFireTime = now; + return lastTruncated; } } return null;