diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/ScheduledBusinessMethodItem.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/ScheduledBusinessMethodItem.java index 0f6c35cfad73bf..2a5b0d77bdf87f 100644 --- a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/ScheduledBusinessMethodItem.java +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/ScheduledBusinessMethodItem.java @@ -11,15 +11,21 @@ public final class ScheduledBusinessMethodItem extends MultiBuildItem { private final BeanInfo bean; - private final List schedules; - private final MethodInfo method; + private final boolean nonBlocking; public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List schedules) { + this(bean, method, schedules, false); + } + + public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List schedules, + boolean hasNonBlockingAnnotation) { this.bean = bean; this.method = method; this.schedules = schedules; + this.nonBlocking = hasNonBlockingAnnotation || SchedulerDotNames.COMPLETION_STAGE.equals(method.returnType().name()) + || SchedulerDotNames.UNI.equals(method.returnType().name()); } /** @@ -38,4 +44,8 @@ public List getSchedules() { return schedules; } + public boolean isNonBlocking() { + return nonBlocking; + } + } diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerDotNames.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerDotNames.java new file mode 100644 index 00000000000000..77ac014f0216fa --- /dev/null +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerDotNames.java @@ -0,0 +1,21 @@ +package io.quarkus.scheduler.deployment; + +import java.util.concurrent.CompletionStage; + +import org.jboss.jandex.DotName; + +import io.quarkus.scheduler.Scheduled; +import io.smallrye.common.annotation.NonBlocking; + +class SchedulerDotNames { + + static final DotName SCHEDULED_NAME = DotName.createSimple(Scheduled.class.getName()); + static final DotName SCHEDULES_NAME = DotName.createSimple(Scheduled.Schedules.class.getName()); + static final DotName SKIP_NEVER_NAME = DotName.createSimple(Scheduled.Never.class.getName()); + static final DotName SKIP_PREDICATE = DotName.createSimple(Scheduled.SkipPredicate.class.getName()); + static final DotName NON_BLOCKING = DotName.createSimple(NonBlocking.class.getName()); + static final DotName UNI = DotName.createSimple("io.smallrye.mutiny.Uni"); + static final DotName COMPLETION_STAGE = DotName.createSimple(CompletionStage.class.getName()); + static final DotName VOID = DotName.createSimple(Void.class.getName()); + +} diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java index 8cbbe7aa318a32..1b85848ee28d0e 100644 --- a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java @@ -14,6 +14,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Function; import org.jboss.jandex.AnnotationInstance; @@ -66,17 +68,20 @@ import io.quarkus.deployment.pkg.builditem.CurateOutcomeBuildItem; import io.quarkus.devconsole.spi.DevConsoleRouteBuildItem; import io.quarkus.devconsole.spi.DevConsoleRuntimeTemplateInfoBuildItem; +import io.quarkus.gizmo.CatchBlockCreator; import io.quarkus.gizmo.ClassCreator; import io.quarkus.gizmo.ClassOutput; +import io.quarkus.gizmo.DescriptorUtils; import io.quarkus.gizmo.MethodCreator; import io.quarkus.gizmo.MethodDescriptor; import io.quarkus.gizmo.ResultHandle; +import io.quarkus.gizmo.TryBlock; import io.quarkus.runtime.metrics.MetricsFactory; import io.quarkus.runtime.util.HashUtil; import io.quarkus.scheduler.Scheduled; import io.quarkus.scheduler.ScheduledExecution; import io.quarkus.scheduler.Scheduler; -import io.quarkus.scheduler.runtime.ScheduledInvoker; +import io.quarkus.scheduler.runtime.DefaultInvoker; import io.quarkus.scheduler.runtime.ScheduledMethodMetadata; import io.quarkus.scheduler.runtime.SchedulerConfig; import io.quarkus.scheduler.runtime.SchedulerContext; @@ -89,11 +94,6 @@ public class SchedulerProcessor { private static final Logger LOGGER = Logger.getLogger(SchedulerProcessor.class); - static final DotName SCHEDULED_NAME = DotName.createSimple(Scheduled.class.getName()); - static final DotName SCHEDULES_NAME = DotName.createSimple(Scheduled.Schedules.class.getName()); - static final DotName SKIP_NEVER_NAME = DotName.createSimple(Scheduled.Never.class.getName()); - static final DotName SKIP_PREDICATE = DotName.createSimple(Scheduled.SkipPredicate.class.getName()); - static final Type SCHEDULED_EXECUTION_TYPE = Type.create(DotName.createSimple(ScheduledExecution.class.getName()), Kind.CLASS); @@ -112,7 +112,8 @@ AutoAddScopeBuildItem autoAddScope() { // We add @Singleton to any bean class that has no scope annotation and declares at least one non-static method annotated with @Scheduled return AutoAddScopeBuildItem.builder() .anyMethodMatches(m -> !Modifier.isStatic(m.flags()) - && (m.hasAnnotation(SCHEDULED_NAME) || m.hasAnnotation(SCHEDULES_NAME))) + && (m.hasAnnotation(SchedulerDotNames.SCHEDULED_NAME) + || m.hasAnnotation(SchedulerDotNames.SCHEDULES_NAME))) .defaultScope(BuiltinScope.SINGLETON) .reason("Found non-static scheduled business methods").build(); } @@ -123,8 +124,9 @@ void collectScheduledMethods(BeanArchiveIndexBuildItem beanArchives, BeanDiscove BuildProducer scheduledBusinessMethods) { // First collect static scheduled methods - List schedules = new ArrayList<>(beanArchives.getIndex().getAnnotations(SCHEDULED_NAME)); - for (AnnotationInstance annotationInstance : beanArchives.getIndex().getAnnotations(SCHEDULES_NAME)) { + List schedules = new ArrayList<>( + beanArchives.getIndex().getAnnotations(SchedulerDotNames.SCHEDULED_NAME)); + for (AnnotationInstance annotationInstance : beanArchives.getIndex().getAnnotations(SchedulerDotNames.SCHEDULES_NAME)) { for (AnnotationInstance scheduledInstance : annotationInstance.value().asNestedArray()) { // We need to set the target of the containing instance schedules.add(AnnotationInstance.create(scheduledInstance.name(), annotationInstance.target(), @@ -137,7 +139,8 @@ void collectScheduledMethods(BeanArchiveIndexBuildItem beanArchives, BeanDiscove } MethodInfo method = annotationInstance.target().asMethod(); if (Modifier.isStatic(method.flags())) { - scheduledBusinessMethods.produce(new ScheduledBusinessMethodItem(null, method, schedules)); + scheduledBusinessMethods.produce(new ScheduledBusinessMethodItem(null, method, schedules, + transformedAnnotations.getAnnotation(method, SchedulerDotNames.NON_BLOCKING) != null)); LOGGER.debugf("Found scheduled static method %s declared on %s", method, method.declaringClass().name()); } } @@ -159,11 +162,13 @@ private void collectScheduledMethods(IndexView index, TransformedAnnotationsBuil continue; } List schedules = null; - AnnotationInstance scheduledAnnotation = transformedAnnotations.getAnnotation(method, SCHEDULED_NAME); + AnnotationInstance scheduledAnnotation = transformedAnnotations.getAnnotation(method, + SchedulerDotNames.SCHEDULED_NAME); if (scheduledAnnotation != null) { schedules = List.of(scheduledAnnotation); } else { - AnnotationInstance schedulesAnnotation = transformedAnnotations.getAnnotation(method, SCHEDULES_NAME); + AnnotationInstance schedulesAnnotation = transformedAnnotations.getAnnotation(method, + SchedulerDotNames.SCHEDULES_NAME); if (schedulesAnnotation != null) { schedules = new ArrayList<>(); for (AnnotationInstance scheduledInstance : schedulesAnnotation.value().asNestedArray()) { @@ -174,7 +179,8 @@ private void collectScheduledMethods(IndexView index, TransformedAnnotationsBuil } } if (schedules != null) { - scheduledBusinessMethods.produce(new ScheduledBusinessMethodItem(bean, method, schedules)); + scheduledBusinessMethods.produce(new ScheduledBusinessMethodItem(bean, method, schedules, + transformedAnnotations.getAnnotation(method, SchedulerDotNames.NON_BLOCKING) != null)); LOGGER.debugf("Found scheduled business method %s declared on %s", method, bean); } } @@ -206,9 +212,10 @@ void validateScheduledBusinessMethods(SchedulerConfig config, List or Uni [method: %s, bean: %s]", method, scheduledMethod.getBean()))); } // Validate cron() and every() expressions @@ -226,11 +233,26 @@ void validateScheduledBusinessMethods(SchedulerConfig config, List unremovableBeans() { // Beans annotated with @Scheduled should never be removed - return List.of(new UnremovableBeanBuildItem(new BeanClassAnnotationExclusion(SCHEDULED_NAME)), - new UnremovableBeanBuildItem(new BeanClassAnnotationExclusion(SCHEDULES_NAME))); + return List.of(new UnremovableBeanBuildItem(new BeanClassAnnotationExclusion(SchedulerDotNames.SCHEDULED_NAME)), + new UnremovableBeanBuildItem(new BeanClassAnnotationExclusion(SchedulerDotNames.SCHEDULES_NAME))); } @BuildStep @@ -301,7 +323,7 @@ public AnnotationsTransformerBuildItem metrics(SchedulerConfig config, return new AnnotationsTransformerBuildItem(AnnotationsTransformer.builder() .appliesTo(METHOD) - .whenContainsAny(List.of(SCHEDULED_NAME, SCHEDULES_NAME)) + .whenContainsAny(List.of(SchedulerDotNames.SCHEDULED_NAME, SchedulerDotNames.SCHEDULES_NAME)) .whenContainsNone(List.of(micrometerTimed, mpTimed, DotName.createSimple("org.eclipse.microprofile.metrics.annotation.SimplyTimed"))) .transform(context -> { @@ -357,55 +379,80 @@ private String generateInvoker(ScheduledBusinessMethodItem scheduledMethod, Clas + HashUtil.sha1(sigBuilder.toString()); ClassCreator invokerCreator = ClassCreator.builder().classOutput(classOutput).className(generatedName) - .interfaces(ScheduledInvoker.class) - .build(); + .superClass(DefaultInvoker.class).build(); - // The descriptor is: void invokeBean(Object execution) - MethodCreator invoke = invokerCreator.getMethodCreator("invokeBean", void.class, Object.class) + // The descriptor is: CompletionStage invoke(ScheduledExecution execution) + MethodCreator invoke = invokerCreator.getMethodCreator("invokeBean", CompletionStage.class, ScheduledExecution.class) .addException(Exception.class); + // Use a try-catch block and return failed future if an exception is thrown + TryBlock tryBlock = invoke.tryBlock(); + CatchBlockCreator catchBlock = tryBlock.addCatch(Throwable.class); + catchBlock.returnValue(catchBlock.invokeStaticMethod( + MethodDescriptor.ofMethod(CompletableFuture.class, "failedStage", CompletionStage.class, Throwable.class), + catchBlock.getCaughtException())); + + String returnTypeStr = DescriptorUtils.typeToString(method.returnType()); + ResultHandle res; if (isStatic) { if (method.parameters().isEmpty()) { - invoke.invokeStaticMethod( - MethodDescriptor.ofMethod(implClazz.name().toString(), method.name(), void.class)); + res = tryBlock.invokeStaticMethod( + MethodDescriptor.ofMethod(implClazz.name().toString(), method.name(), returnTypeStr)); } else { - invoke.invokeStaticMethod( - MethodDescriptor.ofMethod(implClazz.name().toString(), method.name(), void.class, + res = tryBlock.invokeStaticMethod( + MethodDescriptor.ofMethod(implClazz.name().toString(), method.name(), returnTypeStr, ScheduledExecution.class), - invoke.getMethodParam(0)); + tryBlock.getMethodParam(0)); } } else { - // InjectableBean bean = Arc.container().bean("foo1"); // InstanceHandle handle = Arc.container().instance(bean); // handle.get().ping(); - ResultHandle containerHandle = invoke + ResultHandle containerHandle = tryBlock .invokeStaticMethod(MethodDescriptor.ofMethod(Arc.class, "container", ArcContainer.class)); - ResultHandle beanHandle = invoke.invokeInterfaceMethod( + ResultHandle beanHandle = tryBlock.invokeInterfaceMethod( MethodDescriptor.ofMethod(ArcContainer.class, "bean", InjectableBean.class, String.class), - containerHandle, invoke.load(bean.getIdentifier())); - ResultHandle instanceHandle = invoke.invokeInterfaceMethod( + containerHandle, tryBlock.load(bean.getIdentifier())); + ResultHandle instanceHandle = tryBlock.invokeInterfaceMethod( MethodDescriptor.ofMethod(ArcContainer.class, "instance", InstanceHandle.class, InjectableBean.class), containerHandle, beanHandle); - ResultHandle beanInstanceHandle = invoke + ResultHandle beanInstanceHandle = tryBlock .invokeInterfaceMethod(MethodDescriptor.ofMethod(InstanceHandle.class, "get", Object.class), instanceHandle); if (method.parameters().isEmpty()) { - invoke.invokeVirtualMethod( - MethodDescriptor.ofMethod(implClazz.name().toString(), method.name(), void.class), + res = tryBlock.invokeVirtualMethod( + MethodDescriptor.ofMethod(implClazz.name().toString(), method.name(), returnTypeStr), beanInstanceHandle); } else { - invoke.invokeVirtualMethod( - MethodDescriptor.ofMethod(implClazz.name().toString(), method.name(), void.class, + res = tryBlock.invokeVirtualMethod( + MethodDescriptor.ofMethod(implClazz.name().toString(), method.name(), returnTypeStr, ScheduledExecution.class), - beanInstanceHandle, invoke.getMethodParam(0)); + beanInstanceHandle, tryBlock.getMethodParam(0)); } // handle.destroy() - destroy dependent instance afterwards if (BuiltinScope.DEPENDENT.is(bean.getScope())) { - invoke.invokeInterfaceMethod(MethodDescriptor.ofMethod(InstanceHandle.class, "destroy", void.class), + tryBlock.invokeInterfaceMethod(MethodDescriptor.ofMethod(InstanceHandle.class, "destroy", void.class), instanceHandle); } } - invoke.returnValue(null); + + if (res == null) { + // If the return type is void then return a new completed stage + res = tryBlock.invokeStaticMethod( + MethodDescriptor.ofMethod(CompletableFuture.class, "completedStage", CompletionStage.class, Object.class), + tryBlock.loadNull()); + } else if (method.returnType().name().equals(SchedulerDotNames.UNI)) { + // Subscribe to the returned Uni + res = tryBlock.invokeInterfaceMethod(MethodDescriptor.ofMethod(SchedulerDotNames.UNI.toString(), + "subscribeAsCompletionStage", CompletableFuture.class), res); + } + + tryBlock.returnValue(res); + + if (scheduledMethod.isNonBlocking()) { + MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class); + isBlocking.returnValue(isBlocking.load(false)); + } invokerCreator.close(); return generatedName.replace('/', '.'); @@ -490,7 +537,7 @@ private Throwable validateScheduled(CronParser parser, AnnotationInstance schedu AnnotationValue skipExecutionIfValue = schedule.value("skipExecutionIf"); if (skipExecutionIfValue != null) { DotName skipPredicate = skipExecutionIfValue.asClass().name(); - if (!SKIP_NEVER_NAME.equals(skipPredicate) + if (!SchedulerDotNames.SKIP_NEVER_NAME.equals(skipPredicate) && validationContext.beans().withBeanType(skipPredicate).collect().size() != 1) { String message = String.format("There must be exactly one bean that matches the skip predicate: \"%s\" on: %s", skipPredicate, schedule); @@ -503,7 +550,7 @@ private Throwable validateScheduled(CronParser parser, AnnotationInstance schedu @BuildStep UnremovableBeanBuildItem unremoveableSkipPredicates() { - return new UnremovableBeanBuildItem(new UnremovableBeanBuildItem.BeanTypeExclusion(SKIP_PREDICATE)); + return new UnremovableBeanBuildItem(new UnremovableBeanBuildItem.BeanTypeExclusion(SchedulerDotNames.SKIP_PREDICATE)); } } diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/nonblocking/NonBlockingScheduledMethodTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/nonblocking/NonBlockingScheduledMethodTest.java new file mode 100644 index 00000000000000..195365a6fee2bb --- /dev/null +++ b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/nonblocking/NonBlockingScheduledMethodTest.java @@ -0,0 +1,131 @@ +package io.quarkus.scheduler.test.nonblocking; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.enterprise.event.Observes; +import javax.inject.Singleton; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.ScheduledExecution; +import io.quarkus.scheduler.SuccessfulExecution; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.annotation.NonBlocking; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; + +public class NonBlockingScheduledMethodTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root.addClasses(Jobs.class, JobWasExecuted.class)); + + @Test + public void testVoid() throws InterruptedException { + assertTrue(Jobs.VOID_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(Jobs.VOID_ON_EVENT_LOOP.get()); + assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS)); + assertEvents("every_void"); + } + + @Test + public void testUni() throws InterruptedException { + assertTrue(Jobs.UNI_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(Jobs.UNI_ON_EVENT_LOOP.get()); + assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS)); + assertEvents("every_uni"); + } + + @Test + public void testCompletionStage() throws InterruptedException { + assertTrue(Jobs.CS_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(Jobs.CS_ON_EVENT_LOOP.get()); + assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS)); + assertEvents("every_cs"); + } + + private void assertEvents(String id) { + for (SuccessfulExecution exec : Jobs.events) { + if (exec.getExecution().getTrigger().getId().equals(id)) { + return; + } + } + fail("No SuccessfulExecution event fired for " + id + ": " + Jobs.events); + } + + static class Jobs { + + // jobs executed + static final CountDownLatch VOID_LATCH = new CountDownLatch(1); + static final CountDownLatch UNI_LATCH = new CountDownLatch(1); + static final CountDownLatch CS_LATCH = new CountDownLatch(1); + + // jobs executed on the event loop + static final AtomicBoolean VOID_ON_EVENT_LOOP = new AtomicBoolean(); + static final AtomicBoolean UNI_ON_EVENT_LOOP = new AtomicBoolean(); + static final AtomicBoolean CS_ON_EVENT_LOOP = new AtomicBoolean(); + + // sucessfull events + static final CountDownLatch SUCCESS_LATCH = new CountDownLatch(3); + static final List events = new CopyOnWriteArrayList<>(); + + static void onSuccess(@Observes SuccessfulExecution event) { + events.add(event); + SUCCESS_LATCH.countDown(); + } + + @NonBlocking + @Scheduled(every = "0.5s", identity = "every_void", skipExecutionIf = JobWasExecuted.class) + void everySecond() { + VOID_ON_EVENT_LOOP.set(Context.isOnEventLoopThread()); + VOID_LATCH.countDown(); + } + + @Scheduled(every = "0.5s", identity = "every_uni", skipExecutionIf = JobWasExecuted.class) + Uni everySecondUni() { + UNI_ON_EVENT_LOOP.set(Context.isOnEventLoopThread()); + UNI_LATCH.countDown(); + return Uni.createFrom().voidItem(); + } + + @Scheduled(every = "0.5s", identity = "every_cs", skipExecutionIf = JobWasExecuted.class) + CompletionStage everySecondCompletionStage() { + CompletableFuture ret = new CompletableFuture(); + CS_ON_EVENT_LOOP.set(Context.isOnEventLoopThread()); + CS_LATCH.countDown(); + ret.complete(null); + return ret; + } + } + + @Singleton + static class JobWasExecuted implements Scheduled.SkipPredicate { + + @Override + public boolean test(ScheduledExecution execution) { + switch (execution.getTrigger().getId()) { + case "every_void": + return Jobs.VOID_LATCH.getCount() == 0; + case "every_uni": + return Jobs.UNI_LATCH.getCount() == 0; + case "every_cs": + return Jobs.CS_LATCH.getCount() == 0; + default: + return false; + } + } + + } + +} diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/Scheduled.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/Scheduled.java index d2f0aaa4de6919..a3af0609b9e8de 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/Scheduled.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/Scheduled.java @@ -33,8 +33,13 @@ * } * * - * The annotated method must return {@code void} and either declare no parameters or one parameter of type + * The annotated method must return {@code void}, {@code java.util.concurrent.CompletionStage} or + * {@code io.smallrye.mutiny.Uni} and either declare no parameters or one parameter of type * {@link ScheduledExecution}. + *

+ * By default, a scheduled method is executed on the main executor for blocking tasks. However, a scheduled method that returns + * {@code java.util.concurrent.CompletionStage} or {@code io.smallrye.mutiny.Uni}, or is annotated with + * {@link io.smallrye.common.annotation.NonBlocking} is executed on the event loop. * * @see ScheduledExecution */ diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/DefaultInvoker.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/DefaultInvoker.java new file mode 100644 index 00000000000000..06d07e7e460eab --- /dev/null +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/DefaultInvoker.java @@ -0,0 +1,28 @@ +package io.quarkus.scheduler.runtime; + +import java.util.concurrent.CompletionStage; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.ManagedContext; +import io.quarkus.scheduler.ScheduledExecution; + +public abstract class DefaultInvoker implements ScheduledInvoker { + + @Override + public CompletionStage invoke(ScheduledExecution execution) throws Exception { + ManagedContext requestContext = Arc.container().requestContext(); + if (requestContext.isActive()) { + return invokeBean(execution); + } else { + try { + requestContext.activate(); + return invokeBean(execution); + } finally { + requestContext.terminate(); + } + } + } + + protected abstract CompletionStage invokeBean(ScheduledExecution execution) throws Exception; + +} diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/DelegateInvoker.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/DelegateInvoker.java new file mode 100644 index 00000000000000..725b92147d7492 --- /dev/null +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/DelegateInvoker.java @@ -0,0 +1,16 @@ +package io.quarkus.scheduler.runtime; + +abstract class DelegateInvoker implements ScheduledInvoker { + + protected final ScheduledInvoker delegate; + + public DelegateInvoker(ScheduledInvoker delegate) { + this.delegate = delegate; + } + + @Override + public boolean isBlocking() { + return delegate.isBlocking(); + } + +} diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/ScheduledInvoker.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/ScheduledInvoker.java index f248059d4e539f..1e775de35d784a 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/ScheduledInvoker.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/ScheduledInvoker.java @@ -1,11 +1,30 @@ package io.quarkus.scheduler.runtime; -import io.quarkus.arc.runtime.BeanInvoker; +import java.util.concurrent.CompletionStage; + import io.quarkus.scheduler.ScheduledExecution; /** * Invokes a scheduled business method of a bean. */ -public interface ScheduledInvoker extends BeanInvoker { +public interface ScheduledInvoker { + + /** + * + * @param execution + * @return the result + * @throws Exception + */ + CompletionStage invoke(ScheduledExecution execution) throws Exception; + + /** + * A blocking invoker is executed on the main executor for blocking tasks. + * A non-blocking invoker is executed on the event loop. + * + * @return {@code true} if the scheduled method is blocking, {@code false} otherwise + */ + default boolean isBlocking() { + return true; + } } diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index 57ef89c510b77c..d92768d0db5e0f 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -45,6 +45,8 @@ import io.quarkus.scheduler.SuccessfulExecution; import io.quarkus.scheduler.Trigger; import io.quarkus.scheduler.runtime.util.SchedulerUtils; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; @Typed(Scheduler.class) @Singleton @@ -57,17 +59,19 @@ public class SimpleScheduler implements Scheduler { private final ScheduledExecutorService scheduledExecutor; private final ExecutorService executor; + private final Vertx vertx; private volatile boolean running; private final List scheduledTasks; private final boolean enabled; public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedulerRuntimeConfig, Event skippedExecutionEvent, Event successExecutionEvent, - Event failedExecutionEvent) { + Event failedExecutionEvent, Vertx vertx) { this.running = true; this.enabled = schedulerRuntimeConfig.enabled; this.scheduledTasks = new ArrayList<>(); this.executor = context.getExecutor(); + this.vertx = vertx; if (!schedulerRuntimeConfig.enabled) { this.scheduledExecutor = null; @@ -142,7 +146,7 @@ void checkTriggers() { ZonedDateTime now = ZonedDateTime.now(); LOG.tracef("Check triggers at %s", now); for (ScheduledTask task : scheduledTasks) { - task.execute(now, executor); + task.execute(now, executor, vertx); } } @@ -288,29 +292,42 @@ static class ScheduledTask { this.invoker = invoker; } - void execute(ZonedDateTime now, ExecutorService executor) { + void execute(ZonedDateTime now, ExecutorService executor, Vertx vertx) { if (!trigger.isRunning()) { return; } ZonedDateTime scheduledFireTime = trigger.evaluate(now); if (scheduledFireTime != null) { - try { - executor.execute(new Runnable() { - @Override - public void run() { - try { - invoker.invoke(new SimpleScheduledExecution(now, scheduledFireTime, trigger)); - } catch (Throwable t) { - // already logged by the StatusEmitterInvoker + if (invoker.isBlocking()) { + try { + executor.execute(new Runnable() { + @Override + public void run() { + doInvoke(now, scheduledFireTime); } + }); + } catch (RejectedExecutionException e) { + LOG.warnf("Rejected execution of a scheduled task for trigger %s", trigger); + } + } else { + vertx.runOnContext(new Handler() { + @Override + public void handle(Void event) { + doInvoke(now, scheduledFireTime); } }); - } catch (RejectedExecutionException e) { - LOG.warnf("Rejected execution of a scheduled task for trigger %s", trigger); } } } + void doInvoke(ZonedDateTime now, ZonedDateTime scheduledFireTime) { + try { + invoker.invoke(new SimpleScheduledExecution(now, scheduledFireTime, trigger)); + } catch (Throwable t) { + // already logged by the StatusEmitterInvoker + } + } + } static abstract class SimpleTrigger implements Trigger { diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipConcurrentExecutionInvoker.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipConcurrentExecutionInvoker.java index 8b23fae7498f5f..06bc8a80aeba23 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipConcurrentExecutionInvoker.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipConcurrentExecutionInvoker.java @@ -1,5 +1,7 @@ package io.quarkus.scheduler.runtime; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; import javax.enterprise.event.Event; @@ -11,45 +13,35 @@ import io.quarkus.scheduler.SkippedExecution; /** - * A scheduled invoker wrapper that skips concurrent executions. + * An invoker wrapper that skips concurrent executions. * * @see Scheduled#concurrentExecution() * @see io.quarkus.scheduler.Scheduled.ConcurrentExecution#SKIP */ -public final class SkipConcurrentExecutionInvoker implements ScheduledInvoker { +public final class SkipConcurrentExecutionInvoker extends DelegateInvoker { - private static final Logger LOGGER = Logger.getLogger(SkipConcurrentExecutionInvoker.class); + private static final Logger LOG = Logger.getLogger(SkipConcurrentExecutionInvoker.class); private final AtomicBoolean running; - private final ScheduledInvoker delegate; private final Event event; public SkipConcurrentExecutionInvoker(ScheduledInvoker delegate, Event event) { + super(delegate); this.running = new AtomicBoolean(false); - this.delegate = delegate; this.event = event; } @Override - public void invoke(ScheduledExecution execution) throws Exception { + public CompletionStage invoke(ScheduledExecution execution) throws Exception { if (running.compareAndSet(false, true)) { - try { - delegate.invoke(execution); - } finally { - running.set(false); - } - } else { - LOGGER.debugf("Skipped scheduled invoker execution: %s", delegate.getClass().getName()); - SkippedExecution payload = new SkippedExecution(execution, - "The scheduled method should not be executed concurrently"); - event.fire(payload); - event.fireAsync(payload); + return delegate.invoke(execution).whenComplete((r, t) -> running.set(false)); } - } - - @Override - public void invokeBean(ScheduledExecution param) { - throw new UnsupportedOperationException(); + LOG.debugf("Skipped scheduled invoker execution: %s", delegate.getClass().getName()); + SkippedExecution payload = new SkippedExecution(execution, + "The scheduled method should not be executed concurrently"); + event.fire(payload); + event.fireAsync(payload); + return CompletableFuture.completedStage(null); } } diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipPredicateInvoker.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipPredicateInvoker.java index 47e5544853542c..c20478b5f7c949 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipPredicateInvoker.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SkipPredicateInvoker.java @@ -1,5 +1,8 @@ package io.quarkus.scheduler.runtime; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + import javax.enterprise.event.Event; import org.jboss.logging.Logger; @@ -13,37 +16,32 @@ * * @see Scheduled#skipExecutionIf() */ -public final class SkipPredicateInvoker implements ScheduledInvoker { +public final class SkipPredicateInvoker extends DelegateInvoker { - private static final Logger LOGGER = Logger.getLogger(SkipPredicateInvoker.class); + private static final Logger LOG = Logger.getLogger(SkipPredicateInvoker.class); - private final ScheduledInvoker delegate; private final Scheduled.SkipPredicate predicate; private final Event event; public SkipPredicateInvoker(ScheduledInvoker delegate, Scheduled.SkipPredicate predicate, Event event) { - this.delegate = delegate; + super(delegate); this.predicate = predicate; this.event = event; } @Override - public void invoke(ScheduledExecution execution) throws Exception { + public CompletionStage invoke(ScheduledExecution execution) throws Exception { if (predicate.test(execution)) { - LOGGER.debugf("Skipped scheduled invoker execution: %s", delegate.getClass().getName()); + LOG.debugf("Skipped scheduled invoker execution: %s", delegate.getClass().getName()); SkippedExecution payload = new SkippedExecution(execution, predicate.getClass().getName()); event.fire(payload); event.fireAsync(payload); + return CompletableFuture.completedStage(null); } else { - delegate.invoke(execution); + return delegate.invoke(execution); } } - @Override - public void invokeBean(ScheduledExecution param) { - throw new UnsupportedOperationException(); - } - } diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/StatusEmitterInvoker.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/StatusEmitterInvoker.java index fdabed78ee693c..50367f5d5cdcc6 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/StatusEmitterInvoker.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/StatusEmitterInvoker.java @@ -1,56 +1,49 @@ package io.quarkus.scheduler.runtime; +import java.util.concurrent.CompletionStage; + import javax.enterprise.event.Event; import org.jboss.logging.Logger; import io.quarkus.scheduler.FailedExecution; -import io.quarkus.scheduler.Scheduled; import io.quarkus.scheduler.ScheduledExecution; import io.quarkus.scheduler.SuccessfulExecution; /** - * A scheduled invoker wrapper that skips concurrent executions. + * An invoker wrapper that fires CDI events when an execution of a scheduled method is finished. * - * @see Scheduled#concurrentExecution() - * @see Scheduled.ConcurrentExecution#SKIP + * @see SuccessfulExecution + * @see FailedExecution */ -public final class StatusEmitterInvoker implements ScheduledInvoker { - - private static final Logger LOGGER = Logger.getLogger(StatusEmitterInvoker.class); +public final class StatusEmitterInvoker extends DelegateInvoker { - private final ScheduledInvoker delegate; - private final Event successfulExecutionEvent; - private final Event failedExecutionEvent; + private static final Logger LOG = Logger.getLogger(StatusEmitterInvoker.class); - public StatusEmitterInvoker(ScheduledInvoker delegate, Event successfulExecutionEvent, - Event failedExecutionEvent) { - this.delegate = delegate; - this.successfulExecutionEvent = successfulExecutionEvent; - this.failedExecutionEvent = failedExecutionEvent; - } + private final Event successfulEvent; + private final Event failedEvent; - @Override - public void invoke(ScheduledExecution execution) throws Exception { - - try { - delegate.invoke(execution); - SuccessfulExecution successExecution = new SuccessfulExecution(execution); - successfulExecutionEvent.fireAsync(successExecution); - successfulExecutionEvent.fire(successExecution); - } catch (Throwable t) { - LOGGER.errorf(t, "Error occured while executing task for trigger %s", execution.getTrigger()); - FailedExecution failedExecution = new FailedExecution(execution, t); - failedExecutionEvent.fireAsync(failedExecution); - failedExecutionEvent.fire(failedExecution); - // rethrow for quartz job listeners - throw t; - } + public StatusEmitterInvoker(ScheduledInvoker delegate, Event successfulEvent, + Event failedEvent) { + super(delegate); + this.successfulEvent = successfulEvent; + this.failedEvent = failedEvent; } @Override - public void invokeBean(ScheduledExecution param) { - throw new UnsupportedOperationException(); + public CompletionStage invoke(ScheduledExecution execution) throws Exception { + return delegate.invoke(execution).whenComplete((v, t) -> { + if (t != null) { + LOG.errorf(t, "Error occured while executing task for trigger %s", execution.getTrigger()); + FailedExecution failed = new FailedExecution(execution, t); + failedEvent.fireAsync(failed); + failedEvent.fire(failed); + } else { + SuccessfulExecution success = new SuccessfulExecution(execution); + successfulEvent.fireAsync(success); + successfulEvent.fire(success); + } + }); } }