diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SchedulerContext.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SchedulerContext.java index 0c7f8cab98c4d..2723bf9761262 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SchedulerContext.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SchedulerContext.java @@ -2,14 +2,11 @@ import java.lang.reflect.InvocationTargetException; import java.util.List; -import java.util.concurrent.ExecutorService; import com.cronutils.model.CronType; public interface SchedulerContext { - ExecutorService getExecutor(); - CronType getCronType(); List getScheduledMethods(); diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java index 3fcf66bbf4c12..8171f6eb91dc4 100644 --- a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java @@ -61,7 +61,6 @@ import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.AnnotationProxyBuildItem; -import io.quarkus.deployment.builditem.ExecutorBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.GeneratedClassBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; @@ -277,7 +276,7 @@ public List unremovableBeans() { public FeatureBuildItem build(SchedulerConfig config, BuildProducer syntheticBeans, SchedulerRecorder recorder, List scheduledMethods, BuildProducer generatedClasses, BuildProducer reflectiveClass, - AnnotationProxyBuildItem annotationProxy, ExecutorBuildItem executor) { + AnnotationProxyBuildItem annotationProxy) { List scheduledMetadata = new ArrayList<>(); ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClasses, new Function() { @@ -311,7 +310,7 @@ public String apply(String name) { } syntheticBeans.produce(SyntheticBeanBuildItem.configure(SchedulerContext.class).setRuntimeInit() - .supplier(recorder.createContext(config, executor.getExecutorProxy(), scheduledMetadata)) + .supplier(recorder.createContext(config, scheduledMetadata)) .done()); return new FeatureBuildItem(Feature.SCHEDULER); diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/DuplicatedContextTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/DuplicatedContextTest.java new file mode 100644 index 0000000000000..7088bcc3d562c --- /dev/null +++ b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/DuplicatedContextTest.java @@ -0,0 +1,81 @@ +package io.quarkus.scheduler.test; + +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.inject.Inject; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +/** + * Verifies that the @Scheduled method are called on a duplicated context. + */ +public class DuplicatedContextTest { + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(MyScheduledClass.class)); + + @Inject + MyScheduledClass scheduled; + + @Test + public void testBlocking() { + await() + .atMost(Duration.ofSeconds(3)) + .until(() -> scheduled.blockingCalled() > 0); + } + + @Test + public void testNonBlocking() { + await() + .atMost(Duration.ofSeconds(3)) + .until(() -> scheduled.nonBlockingCalled() > 0); + } + + public static class MyScheduledClass { + + private final AtomicInteger blockingCalled = new AtomicInteger(); + private final AtomicInteger nonBlockingCalled = new AtomicInteger(); + + @Scheduled(every = "1m") + public void blocking() { + Context context = Vertx.currentContext(); + Assertions.assertNotNull(context); + Assertions.assertTrue(VertxContext.isDuplicatedContext(context)); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + + blockingCalled.incrementAndGet(); + } + + @Scheduled(every = "1m") + public Uni nonblocking() { + Context context = Vertx.currentContext(); + Assertions.assertNotNull(context); + Assertions.assertTrue(VertxContext.isDuplicatedContext(context)); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + + nonBlockingCalled.incrementAndGet(); + return Uni.createFrom().voidItem(); + } + + public int blockingCalled() { + return blockingCalled.get(); + } + + public int nonBlockingCalled() { + return nonBlockingCalled.get(); + } + } +} diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java index 76f68553b4110..47d46e29b8917 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java @@ -1,7 +1,6 @@ package io.quarkus.scheduler.runtime; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import com.cronutils.model.CronType; @@ -13,18 +12,13 @@ @Recorder public class SchedulerRecorder { - public Supplier createContext(SchedulerConfig config, ExecutorService executorService, + public Supplier createContext(SchedulerConfig config, List scheduledMethods) { return new Supplier() { @Override public Object get() { return new SchedulerContext() { - @Override - public ExecutorService getExecutor() { - return executorService; - } - @Override public CronType getCronType() { return config.cronType; diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index 40a2bf2af9dd9..588dabb3e03cd 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -10,8 +10,6 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -55,6 +53,7 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Vertx; @Typed(Scheduler.class) @@ -67,7 +66,6 @@ public class SimpleScheduler implements Scheduler { private static final long CHECK_PERIOD = 1000L; private final ScheduledExecutorService scheduledExecutor; - private final ExecutorService executor; private final Vertx vertx; private volatile boolean running; private final List scheduledTasks; @@ -79,7 +77,6 @@ public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedule this.running = true; this.enabled = schedulerRuntimeConfig.enabled; this.scheduledTasks = new ArrayList<>(); - this.executor = context.getExecutor(); this.vertx = vertx; if (!schedulerRuntimeConfig.enabled) { @@ -155,7 +152,7 @@ void checkTriggers() { ZonedDateTime now = ZonedDateTime.now(); LOG.tracef("Check triggers at %s", now); for (ScheduledTask task : scheduledTasks) { - task.execute(now, executor, vertx); + task.execute(now, vertx); } } @@ -301,26 +298,26 @@ static class ScheduledTask { this.invoker = invoker; } - void execute(ZonedDateTime now, ExecutorService executor, Vertx vertx) { + void execute(ZonedDateTime now, Vertx vertx) { if (!trigger.isRunning()) { return; } ZonedDateTime scheduledFireTime = trigger.evaluate(now); if (scheduledFireTime != null) { + Context context = VertxContext.getOrCreateDuplicatedContext(vertx); + VertxContextSafetyToggle.setContextSafe(context, true); if (invoker.isBlocking()) { - try { - executor.execute(new Runnable() { - @Override - public void run() { + context.executeBlocking(new Handler>() { + @Override + public void handle(Promise p) { + try { doInvoke(now, scheduledFireTime); + } finally { + p.complete(); } - }); - } catch (RejectedExecutionException e) { - LOG.warnf("Rejected execution of a scheduled task for trigger %s", trigger); - } + } + }, false); } else { - Context context = VertxContext.getOrCreateDuplicatedContext(vertx); - VertxContextSafetyToggle.setContextSafe(context, true); context.runOnContext(new Handler() { @Override public void handle(Void event) { diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/devconsole/SchedulerDevConsoleRecorder.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/devconsole/SchedulerDevConsoleRecorder.java index 2f98d3737d4de..d5f5941a36dd1 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/devconsole/SchedulerDevConsoleRecorder.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/devconsole/SchedulerDevConsoleRecorder.java @@ -17,7 +17,9 @@ import io.quarkus.scheduler.common.runtime.ScheduledMethodMetadata; import io.quarkus.scheduler.common.runtime.SchedulerContext; import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; +import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.Vertx; @@ -80,39 +82,36 @@ protected void handlePost(RoutingContext ctx, MultiMap form) throws Exception { SchedulerContext context = Arc.container().instance(SchedulerContext.class).get(); for (ScheduledMethodMetadata metadata : context.getScheduledMethods()) { if (metadata.getMethodDescription().equals(name)) { - context.getExecutor().execute(new Runnable() { - @Override - public void run() { - final ClassLoader previousCl = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(currentCl); - ScheduledInvoker invoker = context - .createInvoker(metadata.getInvokerClassName()); - if (invoker.isBlocking()) { + Vertx vertx = Arc.container().instance(Vertx.class).get(); + Context vdc = VertxContext.getOrCreateDuplicatedContext(vertx); + VertxContextSafetyToggle.setContextSafe(vdc, true); + try { + ScheduledInvoker invoker = context + .createInvoker(metadata.getInvokerClassName()); + if (invoker.isBlocking()) { + vdc.executeBlocking(p -> { + try { invoker.invoke(new DevModeScheduledExecution()); - } else { - Vertx vertx = Arc.container().instance(Vertx.class).get(); - VertxContext.getOrCreateDuplicatedContext(vertx).runOnContext(new Handler() { - @Override - public void handle(Void event) { - try { - invoker.invoke(new DevModeScheduledExecution()); - } catch (Exception ignored) { - } - } - }); + } catch (Exception ignored) { + } finally { + p.complete(); } - LOG.infof("Invoked scheduled method %s via Dev UI", name); - } catch (Exception e) { - LOG.error( - "Unable to invoke a @Scheduled method: " - + metadata.getMethodDescription(), - e); - } finally { - Thread.currentThread().setContextClassLoader(previousCl); - } + }, false); + } else { + vdc.runOnContext(x -> { + try { + invoker.invoke(new DevModeScheduledExecution()); + } catch (Exception ignored) { + } + }); } - }); + LOG.infof("Invoked scheduled method %s via Dev UI", name); + } catch (Exception e) { + LOG.error( + "Unable to invoke a @Scheduled method: " + + metadata.getMethodDescription(), + e); + } flashMessage(ctx, "Action invoked"); return; }