Skip to content

Commit

Permalink
Scheduler - support non-blocking scheduled methods
Browse files Browse the repository at this point in the history
- resolves quarkusio#24621
  • Loading branch information
mkouba committed Mar 31, 2022
1 parent c6c2788 commit 739d250
Show file tree
Hide file tree
Showing 17 changed files with 591 additions and 140 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package io.quarkus.quartz.test.nonblocking;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.enterprise.event.Observes;
import javax.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.SuccessfulExecution;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;

public class NonBlockingScheduledMethodTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root.addClasses(Jobs.class, JobWasExecuted.class));

@Test
public void testVoid() throws InterruptedException {
assertTrue(Jobs.VOID_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.VOID_ON_EVENT_LOOP.get());
assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS));
assertEvents("every_void");
}

@Test
public void testUni() throws InterruptedException {
assertTrue(Jobs.UNI_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.UNI_ON_EVENT_LOOP.get());
assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS));
assertEvents("every_uni");
}

@Test
public void testCompletionStage() throws InterruptedException {
assertTrue(Jobs.CS_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.CS_ON_EVENT_LOOP.get());
assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS));
assertEvents("every_cs");
}

private void assertEvents(String id) {
for (SuccessfulExecution exec : Jobs.events) {
if (exec.getExecution().getTrigger().getId().equals(id)) {
return;
}
}
fail("No SuccessfulExecution event fired for " + id + ": " + Jobs.events);
}

static class Jobs {

// jobs executed
static final CountDownLatch VOID_LATCH = new CountDownLatch(1);
static final CountDownLatch UNI_LATCH = new CountDownLatch(1);
static final CountDownLatch CS_LATCH = new CountDownLatch(1);

// jobs executed on the event loop
static final AtomicBoolean VOID_ON_EVENT_LOOP = new AtomicBoolean();
static final AtomicBoolean UNI_ON_EVENT_LOOP = new AtomicBoolean();
static final AtomicBoolean CS_ON_EVENT_LOOP = new AtomicBoolean();

// sucessfull events
static final CountDownLatch SUCCESS_LATCH = new CountDownLatch(3);
static final List<SuccessfulExecution> events = new CopyOnWriteArrayList<>();

static void onSuccess(@Observes SuccessfulExecution event) {
events.add(event);
SUCCESS_LATCH.countDown();
}

@NonBlocking
@Scheduled(every = "0.5s", identity = "every_void", skipExecutionIf = JobWasExecuted.class)
void everySecond() {
VOID_ON_EVENT_LOOP.set(Context.isOnEventLoopThread());
VOID_LATCH.countDown();
}

@Scheduled(every = "0.5s", identity = "every_uni", skipExecutionIf = JobWasExecuted.class)
Uni<Void> everySecondUni() {
UNI_ON_EVENT_LOOP.set(Context.isOnEventLoopThread());
UNI_LATCH.countDown();
return Uni.createFrom().voidItem();
}

@Scheduled(every = "0.5s", identity = "every_cs", skipExecutionIf = JobWasExecuted.class)
CompletionStage<Void> everySecondCompletionStage() {
CompletableFuture<Void> ret = new CompletableFuture<Void>();
CS_ON_EVENT_LOOP.set(Context.isOnEventLoopThread());
CS_LATCH.countDown();
ret.complete(null);
return ret;
}
}

@Singleton
static class JobWasExecuted implements Scheduled.SkipPredicate {

@Override
public boolean test(ScheduledExecution execution) {
switch (execution.getTrigger().getId()) {
case "every_void":
return Jobs.VOID_LATCH.getCount() == 0;
case "every_uni":
return Jobs.UNI_LATCH.getCount() == 0;
case "every_cs":
return Jobs.CS_LATCH.getCount() == 0;
default:
return false;
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@
import io.quarkus.scheduler.runtime.SkipPredicateInvoker;
import io.quarkus.scheduler.runtime.StatusEmitterInvoker;
import io.quarkus.scheduler.runtime.util.SchedulerUtils;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;

@Singleton
public class QuartzScheduler implements Scheduler {
Expand All @@ -87,7 +90,8 @@ public class QuartzScheduler implements Scheduler {

public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, SchedulerRuntimeConfig schedulerRuntimeConfig,
Event<SkippedExecution> skippedExecutionEvent, Event<SuccessfulExecution> successfulExecutionEvent,
Event<FailedExecution> failedExecutionEvent, Instance<Job> jobs, Instance<UserTransaction> userTransaction) {
Event<FailedExecution> failedExecutionEvent, Instance<Job> jobs, Instance<UserTransaction> userTransaction,
Vertx vertx) {
enabled = schedulerRuntimeConfig.enabled;
final Duration defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod;
final QuartzRuntimeConfig runtimeConfig = quartzSupport.getRuntimeConfig();
Expand Down Expand Up @@ -122,7 +126,7 @@ public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, Sc
scheduler = schedulerFactory.getScheduler();

// Set custom job factory
scheduler.setJobFactory(new InvokerJobFactory(scheduledTasks, jobs));
scheduler.setJobFactory(new InvokerJobFactory(scheduledTasks, jobs, vertx));
CronType cronType = context.getCronType();
CronDefinition def = CronDefinitionBuilder.instanceDefinitionFor(cronType);
CronParser parser = new CronParser(def);
Expand Down Expand Up @@ -543,18 +547,33 @@ private Properties getAdditionalConfigurationProperties(String prefix, Map<Strin
static class InvokerJob implements Job {

final QuartzTrigger trigger;
final Vertx vertx;

InvokerJob(QuartzTrigger trigger) {
InvokerJob(QuartzTrigger trigger, Vertx vertx) {
this.trigger = trigger;
this.vertx = vertx;
}

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
if (trigger.invoker != null) { // could be null from previous runs
try {
trigger.invoker.invoke(new QuartzScheduledExecution(trigger, context));
} catch (Exception e) {
throw new JobExecutionException(e);
if (trigger.invoker.isBlocking()) {
try {
trigger.invoker.invoke(new QuartzScheduledExecution(trigger, context));
} catch (Exception e) {
throw new JobExecutionException(e);
}
} else {
VertxContext.getOrCreateDuplicatedContext(vertx).runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
try {
trigger.invoker.invoke(new QuartzScheduledExecution(trigger, context));
} catch (Exception e) {
// already logged by the StatusEmitterInvoker
}
}
});
}
}
}
Expand Down Expand Up @@ -632,10 +651,12 @@ static class InvokerJobFactory extends SimpleJobFactory {

final Map<String, QuartzTrigger> scheduledTasks;
final Instance<Job> jobs;
final Vertx vertx;

InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs) {
InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs, Vertx vertx) {
this.scheduledTasks = scheduledTasks;
this.jobs = jobs;
this.vertx = vertx;
}

@SuppressWarnings("unchecked")
Expand All @@ -644,7 +665,7 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr
Class<? extends Job> jobClass = bundle.getJobDetail().getJobClass();

if (jobClass.equals(InvokerJob.class)) {
return new InvokerJob(scheduledTasks.get(bundle.getJobDetail().getKey().getName()));
return new InvokerJob(scheduledTasks.get(bundle.getJobDetail().getKey().getName()), vertx);
}
if (Subclass.class.isAssignableFrom(jobClass)) {
// Get the original class from an intercepted bean class
Expand Down
4 changes: 2 additions & 2 deletions extensions/scheduler/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core-deployment</artifactId>
<artifactId>quarkus-arc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
<artifactId>quarkus-vertx-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@
public final class ScheduledBusinessMethodItem extends MultiBuildItem {

private final BeanInfo bean;

private final List<AnnotationInstance> schedules;

private final MethodInfo method;
private final boolean nonBlocking;

public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List<AnnotationInstance> schedules) {
this(bean, method, schedules, false);
}

public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List<AnnotationInstance> schedules,
boolean hasNonBlockingAnnotation) {
this.bean = bean;
this.method = method;
this.schedules = schedules;
this.nonBlocking = hasNonBlockingAnnotation || SchedulerDotNames.COMPLETION_STAGE.equals(method.returnType().name())
|| SchedulerDotNames.UNI.equals(method.returnType().name());
}

/**
Expand All @@ -38,4 +44,8 @@ public List<AnnotationInstance> getSchedules() {
return schedules;
}

public boolean isNonBlocking() {
return nonBlocking;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.scheduler.deployment;

import java.util.concurrent.CompletionStage;

import org.jboss.jandex.DotName;

import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.NonBlocking;

class SchedulerDotNames {

static final DotName SCHEDULED_NAME = DotName.createSimple(Scheduled.class.getName());
static final DotName SCHEDULES_NAME = DotName.createSimple(Scheduled.Schedules.class.getName());
static final DotName SKIP_NEVER_NAME = DotName.createSimple(Scheduled.Never.class.getName());
static final DotName SKIP_PREDICATE = DotName.createSimple(Scheduled.SkipPredicate.class.getName());
static final DotName NON_BLOCKING = DotName.createSimple(NonBlocking.class.getName());
static final DotName UNI = DotName.createSimple("io.smallrye.mutiny.Uni");
static final DotName COMPLETION_STAGE = DotName.createSimple(CompletionStage.class.getName());
static final DotName VOID = DotName.createSimple(Void.class.getName());

}
Loading

0 comments on commit 739d250

Please sign in to comment.