diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionProceedTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionProceedTest.java new file mode 100644 index 0000000000000..5edf0fd0ed2d9 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionProceedTest.java @@ -0,0 +1,53 @@ +package io.quarkus.quartz.test; + +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.test.QuarkusUnitTest; + +public class ConcurrentExecutionProceedTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(Jobs.class)); + + @Test + public void testExecution() { + try { + // Wait until Jobs#concurrent() is executed 3x and skipped 0x + if (Jobs.START_LATCH.await(10, TimeUnit.SECONDS)) { + // Unblock all executions + Jobs.BLOCKING_LATCH.countDown(); + } else { + fail("Jobs were not executed in 10 seconds!"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + static class Jobs { + + static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1); + + static final CountDownLatch START_LATCH = new CountDownLatch(3); + + @Scheduled(every = "1s") + void concurrent() throws InterruptedException { + START_LATCH.countDown(); + if (!BLOCKING_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("concurrent() execution blocked too long..."); + } + } + } +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionSkipTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionSkipTest.java new file mode 100644 index 0000000000000..1e63ad64a84d5 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionSkipTest.java @@ -0,0 +1,66 @@ +package io.quarkus.quartz.test; + +import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.event.Observes; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.SkippedExecution; +import io.quarkus.test.QuarkusUnitTest; + +public class ConcurrentExecutionSkipTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(Jobs.class)); + + @Test + public void testExecution() { + try { + // Wait until Jobs#nonconcurrent() is executed 1x and skipped 1x + if (Jobs.SKIPPED_LATCH.await(10, TimeUnit.SECONDS)) { + // Exactly one job is blocked + assertEquals(1, Jobs.COUNTER.get()); + // Unblock all executions + Jobs.BLOCKING_LATCH.countDown(); + } else { + fail("Jobs were not executed in 10 seconds!"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + static class Jobs { + + static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1); + + static final AtomicInteger COUNTER = new AtomicInteger(0); + static final CountDownLatch SKIPPED_LATCH = new CountDownLatch(1); + + @Scheduled(every = "1s", concurrentExecution = SKIP) + void nonconcurrent() throws InterruptedException { + COUNTER.incrementAndGet(); + if (!BLOCKING_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("nonconcurrent() execution blocked too long..."); + } + } + + void onSkip(@Observes SkippedExecution event) { + SKIPPED_LATCH.countDown(); + } + } +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionTest.java deleted file mode 100644 index 4176e6a0dadff..0000000000000 --- a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionTest.java +++ /dev/null @@ -1,57 +0,0 @@ -package io.quarkus.quartz.test; - -import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.jboss.shrinkwrap.api.ShrinkWrap; -import org.jboss.shrinkwrap.api.spec.JavaArchive; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import io.quarkus.scheduler.Scheduled; -import io.quarkus.test.QuarkusUnitTest; - -public class ConcurrentExecutionTest { - - @RegisterExtension - static final QuarkusUnitTest test = new QuarkusUnitTest() - .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) - .addClasses(Jobs.class)); - - @Test - public void testNonconcurrentExecution() throws InterruptedException { - if (Jobs.LATCH.await(5, TimeUnit.SECONDS)) { - assertEquals(1, Jobs.COUNTER.get()); - } else { - fail("Scheduled methods not executed"); - } - } - - static class Jobs { - - static final CountDownLatch LATCH = new CountDownLatch(3); - static final AtomicInteger COUNTER = new AtomicInteger(); - - @Scheduled(every = "1s") - void concurrent() { - LATCH.countDown(); - } - - @Scheduled(every = "1s", concurrentExecution = SKIP) - void nonconcurrent() throws InterruptedException { - if (LATCH.getCount() == 0) { - // we are already done with our test, don't increment the counter anymore - return; - } - COUNTER.incrementAndGet(); - if (!LATCH.await(5, TimeUnit.SECONDS)) { - throw new IllegalStateException(""); - } - } - } -} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzScheduler.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzScheduler.java index f99d8fc87ab40..6f5bf60aa6a78 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzScheduler.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzScheduler.java @@ -10,6 +10,7 @@ import javax.annotation.Priority; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.context.BeforeDestroyed; +import javax.enterprise.event.Event; import javax.enterprise.event.Observes; import javax.enterprise.inject.Produces; import javax.inject.Singleton; @@ -47,6 +48,7 @@ import io.quarkus.scheduler.Scheduled.ConcurrentExecution; import io.quarkus.scheduler.ScheduledExecution; import io.quarkus.scheduler.Scheduler; +import io.quarkus.scheduler.SkippedExecution; import io.quarkus.scheduler.Trigger; import io.quarkus.scheduler.runtime.ScheduledInvoker; import io.quarkus.scheduler.runtime.ScheduledMethodMetadata; @@ -75,7 +77,7 @@ org.quartz.Scheduler produceQuartzScheduler() { } public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, Config config, - SchedulerRuntimeConfig schedulerRuntimeConfig) { + SchedulerRuntimeConfig schedulerRuntimeConfig, Event skippedExecutionEvent) { enabled = schedulerRuntimeConfig.enabled; if (!enabled) { LOGGER.info("Quartz scheduler is disabled by config property and will not be started"); @@ -117,7 +119,7 @@ public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, Co } ScheduledInvoker invoker = context.createInvoker(method.getInvokerClassName()); if (scheduled.concurrentExecution() == ConcurrentExecution.SKIP) { - invoker = new SkipConcurrentExecutionInvoker(invoker); + invoker = new SkipConcurrentExecutionInvoker(invoker, skippedExecutionEvent); } invokers.put(identity, invoker); diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionProceedTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionProceedTest.java new file mode 100644 index 0000000000000..952a5d3a950ab --- /dev/null +++ b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionProceedTest.java @@ -0,0 +1,53 @@ +package io.quarkus.scheduler.test; + +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.test.QuarkusUnitTest; + +public class ConcurrentExecutionProceedTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(Jobs.class)); + + @Test + public void testExecution() { + try { + // Wait until Jobs#concurrent() is executed 3x and skipped 0x + if (Jobs.START_LATCH.await(10, TimeUnit.SECONDS)) { + // Unblock all executions + Jobs.BLOCKING_LATCH.countDown(); + } else { + fail("Jobs were not executed in 10 seconds!"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + static class Jobs { + + static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1); + + static final CountDownLatch START_LATCH = new CountDownLatch(3); + + @Scheduled(every = "1s") + void concurrent() throws InterruptedException { + START_LATCH.countDown(); + if (!BLOCKING_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("concurrent() execution blocked too long..."); + } + } + } +} diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionSkipTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionSkipTest.java new file mode 100644 index 0000000000000..5ee890709c1f8 --- /dev/null +++ b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionSkipTest.java @@ -0,0 +1,66 @@ +package io.quarkus.scheduler.test; + +import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.event.Observes; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.SkippedExecution; +import io.quarkus.test.QuarkusUnitTest; + +public class ConcurrentExecutionSkipTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(Jobs.class)); + + @Test + public void testExecution() { + try { + // Wait until Jobs#nonconcurrent() is executed 1x and skipped 1x + if (Jobs.SKIPPED_LATCH.await(10, TimeUnit.SECONDS)) { + // Exactly one job is blocked + assertEquals(1, Jobs.COUNTER.get()); + // Unblock all executions + Jobs.BLOCKING_LATCH.countDown(); + } else { + fail("Jobs were not executed in 10 seconds!"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + static class Jobs { + + static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1); + + static final AtomicInteger COUNTER = new AtomicInteger(0); + static final CountDownLatch SKIPPED_LATCH = new CountDownLatch(1); + + @Scheduled(every = "1s", concurrentExecution = SKIP) + void nonconcurrent() throws InterruptedException { + COUNTER.incrementAndGet(); + if (!BLOCKING_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("nonconcurrent() execution blocked too long..."); + } + } + + void onSkip(@Observes SkippedExecution event) { + SKIPPED_LATCH.countDown(); + } + } +} diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionTest.java deleted file mode 100644 index 5dc9536544fb1..0000000000000 --- a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionTest.java +++ /dev/null @@ -1,57 +0,0 @@ -package io.quarkus.scheduler.test; - -import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.jboss.shrinkwrap.api.ShrinkWrap; -import org.jboss.shrinkwrap.api.spec.JavaArchive; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import io.quarkus.scheduler.Scheduled; -import io.quarkus.test.QuarkusUnitTest; - -public class ConcurrentExecutionTest { - - @RegisterExtension - static final QuarkusUnitTest test = new QuarkusUnitTest() - .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) - .addClasses(Jobs.class)); - - @Test - public void testNonconcurrentExecution() throws InterruptedException { - if (Jobs.LATCH.await(5, TimeUnit.SECONDS)) { - assertEquals(1, Jobs.COUNTER.get()); - } else { - fail("Scheduled methods not executed"); - } - } - - static class Jobs { - - static final CountDownLatch LATCH = new CountDownLatch(3); - static final AtomicInteger COUNTER = new AtomicInteger(); - - @Scheduled(every = "1s") - void concurrent() { - LATCH.countDown(); - } - - @Scheduled(every = "1s", concurrentExecution = SKIP) - void nonconcurrent() throws InterruptedException { - if (LATCH.getCount() == 0) { - // we are already done with our test, don't increment the counter anymore - return; - } - COUNTER.incrementAndGet(); - if (!LATCH.await(5, TimeUnit.SECONDS)) { - throw new IllegalStateException(""); - } - } - } -} diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/SkippedExecution.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/SkippedExecution.java new file mode 100644 index 0000000000000..a31e13a1082fe --- /dev/null +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/SkippedExecution.java @@ -0,0 +1,21 @@ +package io.quarkus.scheduler; + +import java.time.Instant; + +/** + * A CDI event that is fired synchronously and asynchronously when a concurrent execution of a scheduled method is skipped. + * + * @see io.quarkus.scheduler.Scheduled.ConcurrentExecution#SKIP + */ +public class SkippedExecution { + + public final String triggerId; + + public final Instant fireTime; + + public SkippedExecution(String triggerId, Instant fireTime) { + this.triggerId = triggerId; + this.fireTime = fireTime; + } + +} diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index fc22d86d9cd9b..d5c1aea3e6d8c 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -15,6 +15,7 @@ import javax.annotation.PreDestroy; import javax.annotation.Priority; +import javax.enterprise.event.Event; import javax.enterprise.event.Observes; import javax.enterprise.inject.Typed; import javax.inject.Singleton; @@ -36,6 +37,7 @@ import io.quarkus.scheduler.Scheduled.ConcurrentExecution; import io.quarkus.scheduler.ScheduledExecution; import io.quarkus.scheduler.Scheduler; +import io.quarkus.scheduler.SkippedExecution; import io.quarkus.scheduler.Trigger; @Typed(Scheduler.class) @@ -53,7 +55,8 @@ public class SimpleScheduler implements Scheduler { private final List scheduledTasks; private final boolean enabled; - public SimpleScheduler(SchedulerContext context, Config config, SchedulerRuntimeConfig schedulerRuntimeConfig) { + public SimpleScheduler(SchedulerContext context, Config config, SchedulerRuntimeConfig schedulerRuntimeConfig, + Event skippedExecutionEvent) { this.running = true; this.enabled = schedulerRuntimeConfig.enabled; this.scheduledTasks = new ArrayList<>(); @@ -84,7 +87,7 @@ public void run() { config); ScheduledInvoker invoker = context.createInvoker(method.getInvokerClassName()); if (scheduled.concurrentExecution() == ConcurrentExecution.SKIP) { - invoker = new SkipConcurrentExecutionInvoker(invoker); + invoker = new SkipConcurrentExecutionInvoker(invoker, skippedExecutionEvent); } scheduledTasks.add(new ScheduledTask(trigger, invoker)); } diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipConcurrentExecutionInvoker.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipConcurrentExecutionInvoker.java index 913976126c120..9ece757b62010 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipConcurrentExecutionInvoker.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipConcurrentExecutionInvoker.java @@ -2,10 +2,13 @@ import java.util.concurrent.atomic.AtomicBoolean; +import javax.enterprise.event.Event; + import org.jboss.logging.Logger; import io.quarkus.scheduler.Scheduled; import io.quarkus.scheduler.ScheduledExecution; +import io.quarkus.scheduler.SkippedExecution; /** * A scheduled invoker wrapper that skips concurrent executions. @@ -19,10 +22,12 @@ public final class SkipConcurrentExecutionInvoker implements ScheduledInvoker { private final AtomicBoolean running; private final ScheduledInvoker delegate; + private final Event event; - public SkipConcurrentExecutionInvoker(ScheduledInvoker delegate) { + public SkipConcurrentExecutionInvoker(ScheduledInvoker delegate, Event event) { this.running = new AtomicBoolean(false); this.delegate = delegate; + this.event = event; } @Override @@ -35,6 +40,9 @@ public void invoke(ScheduledExecution execution) { } } else { LOGGER.debugf("Skipped scheduled invoker execution: %s", delegate.getClass().getName()); + SkippedExecution payload = new SkippedExecution(execution.getTrigger().getId(), execution.getFireTime()); + event.fire(payload); + event.fireAsync(payload); } }