From 176649afb2bf8471a318a046a369dd6eaa0a5293 Mon Sep 17 00:00:00 2001 From: Matej Novotny Date: Tue, 23 Jul 2024 11:31:33 +0200 Subject: [PATCH] Quartz - allow bean based jobs to be interruptable --- .../programmatic/InterruptableJobTest.java | 137 ++++++++++++++++++ .../quarkus/quartz/runtime/CdiAwareJob.java | 22 ++- 2 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/InterruptableJobTest.java diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/InterruptableJobTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/InterruptableJobTest.java new file mode 100644 index 0000000000000..f7226ab5d5d4b --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/InterruptableJobTest.java @@ -0,0 +1,137 @@ +package io.quarkus.quartz.test.programmatic; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.quartz.InterruptableJob; +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.UnableToInterruptJobException; + +import io.quarkus.test.QuarkusUnitTest; + +public class InterruptableJobTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(MyJob.class) + .addAsResource(new StringAsset("quarkus.scheduler.start-mode=forced"), + "application.properties")); + + @Inject + Scheduler scheduler; + + static final CountDownLatch INTERRUPT_LATCH = new CountDownLatch(1); + static final CountDownLatch EXECUTE_LATCH = new CountDownLatch(1); + + static final CountDownLatch NON_INTERRUPTABLE_EXECUTE_LATCH = new CountDownLatch(1); + static final CountDownLatch NON_INTERRUPTABLE_HOLD_LATCH = new CountDownLatch(1); + + @Test + public void testInterruptableJob() throws InterruptedException { + + String jobKey = "myJob"; + JobKey key = new JobKey(jobKey); + Trigger trigger = TriggerBuilder.newTrigger() + .startNow() + .build(); + + JobDetail job = JobBuilder.newJob(MyJob.class) + .withIdentity(key) + .build(); + + try { + scheduler.scheduleJob(job, trigger); + // wait for job to start executing, then interrupt + EXECUTE_LATCH.await(2, TimeUnit.SECONDS); + scheduler.interrupt(key); + } catch (SchedulerException e) { + throw new RuntimeException(e); + } + + assertTrue(INTERRUPT_LATCH.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testNonInterruptableJob() throws InterruptedException { + + String jobKey = "myNonInterruptableJob"; + JobKey key = new JobKey(jobKey); + Trigger trigger = TriggerBuilder.newTrigger() + .startNow() + .build(); + + JobDetail job = JobBuilder.newJob(MyNonInterruptableJob.class) + .withIdentity(key) + .build(); + + try { + scheduler.scheduleJob(job, trigger); + } catch (SchedulerException e) { + throw new RuntimeException(e); + } + + // wait for job to start executing, then interrupt + NON_INTERRUPTABLE_EXECUTE_LATCH.await(2, TimeUnit.SECONDS); + try { + scheduler.interrupt(key); + fail("Should have thrown UnableToInterruptJobException"); + } catch (UnableToInterruptJobException e) { + // This is expected, release the latch holding the job + NON_INTERRUPTABLE_HOLD_LATCH.countDown(); + } + } + + @ApplicationScoped + static class MyJob implements InterruptableJob { + + @Override + public void execute(JobExecutionContext context) { + EXECUTE_LATCH.countDown(); + try { + // halt execution so that we can interrupt it + INTERRUPT_LATCH.await(4, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void interrupt() { + INTERRUPT_LATCH.countDown(); + } + } + + @ApplicationScoped + static class MyNonInterruptableJob implements Job { + + @Override + public void execute(JobExecutionContext context) { + NON_INTERRUPTABLE_EXECUTE_LATCH.countDown(); + try { + // halt execution so that we can interrupt it + NON_INTERRUPTABLE_HOLD_LATCH.await(4, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/CdiAwareJob.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/CdiAwareJob.java index 23f4065234906..4e02136078ef9 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/CdiAwareJob.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/CdiAwareJob.java @@ -3,10 +3,12 @@ import jakarta.enterprise.context.Dependent; import jakarta.enterprise.inject.Instance; +import org.quartz.InterruptableJob; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.Scheduler; +import org.quartz.UnableToInterruptJobException; import org.quartz.spi.TriggerFiredBundle; /** @@ -15,7 +17,7 @@ * trigger. * We will therefore create a new dependent bean for every trigger and destroy it afterwards. */ -class CdiAwareJob implements Job { +class CdiAwareJob implements InterruptableJob { private final Instance jobInstance; @@ -34,4 +36,22 @@ public void execute(JobExecutionContext context) throws JobExecutionException { } } } + + @Override + public void interrupt() throws UnableToInterruptJobException { + Instance.Handle handle = jobInstance.getHandle(); + // delegate if possible; throw an exception in other cases + if (InterruptableJob.class.isAssignableFrom(handle.getBean().getBeanClass())) { + try { + ((InterruptableJob) handle.get()).interrupt(); + } finally { + if (handle.getBean().getScope().equals(Dependent.class)) { + handle.destroy(); + } + } + } else { + throw new UnableToInterruptJobException("Job " + handle.getBean().getBeanClass() + + " can not be interrupted, since it does not implement " + InterruptableJob.class.getName()); + } + } }